From commits-return-49343-archive-asf-public=cust-asf.ponee.io@qpid.apache.org Wed Jan 8 10:15:11 2020 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [207.244.88.153]) by mx-eu-01.ponee.io (Postfix) with SMTP id 9AE6A180607 for ; Wed, 8 Jan 2020 11:15:10 +0100 (CET) Received: (qmail 14941 invoked by uid 500); 8 Jan 2020 10:15:09 -0000 Mailing-List: contact commits-help@qpid.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@qpid.apache.org Delivered-To: mailing list commits@qpid.apache.org Received: (qmail 14932 invoked by uid 99); 8 Jan 2020 10:15:09 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 08 Jan 2020 10:15:09 +0000 Received: by gitbox.apache.org (ASF Mail Server at gitbox.apache.org, from userid 33) id 9638B8D80D; Wed, 8 Jan 2020 10:15:09 +0000 (UTC) Date: Wed, 08 Jan 2020 10:15:09 +0000 To: "commits@qpid.apache.org" Subject: [qpid-broker-j] branch 7.1.x updated: QPID-8398: [Broker-J][AMQP 0-9] Use SoftReferences to access decoded message heades in order to GC them before running into OutOfMemory conditions MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit Message-ID: <157847850954.2759.6392771712402798972@gitbox.apache.org> From: orudyy@apache.org X-Git-Host: gitbox.apache.org X-Git-Repo: qpid-broker-j X-Git-Refname: refs/heads/7.1.x X-Git-Reftype: branch X-Git-Oldrev: 5209244dc8223c8dacf8fd2048baf30edca63518 X-Git-Newrev: 664d6f21ae73bd97c42ead00b7e99d6242d4a21a X-Git-Rev: 664d6f21ae73bd97c42ead00b7e99d6242d4a21a X-Git-NotificationType: ref_changed_plus_diff X-Git-Multimail-Version: 1.5.dev Auto-Submitted: auto-generated This is an automated email from the ASF dual-hosted git repository. orudyy pushed a commit to branch 7.1.x in repository https://gitbox.apache.org/repos/asf/qpid-broker-j.git The following commit(s) were added to refs/heads/7.1.x by this push: new 664d6f2 QPID-8398: [Broker-J][AMQP 0-9] Use SoftReferences to access decoded message heades in order to GC them before running into OutOfMemory conditions 664d6f2 is described below commit 664d6f21ae73bd97c42ead00b7e99d6242d4a21a Author: Alex Rudyy AuthorDate: Wed Jan 8 00:32:58 2020 +0000 QPID-8398: [Broker-J][AMQP 0-9] Use SoftReferences to access decoded message heades in order to GC them before running into OutOfMemory conditions --- .../apache/qpid/server/protocol/v0_8/AMQType.java | 10 +- .../qpid/server/protocol/v0_8/EncodingUtils.java | 2 +- .../qpid/server/protocol/v0_8/FieldTable.java | 587 +++++++++++++-------- .../qpid/server/protocol/v0_8/FieldTableTest.java | 21 +- .../qpid/server/protocol/v0_8/MessageMetaData.java | 12 +- .../protocol/v0_8/transport/BasicConsumeBody.java | 12 +- .../transport/BasicContentHeaderProperties.java | 86 ++- .../protocol/v0_8/transport/ChannelAlertBody.java | 4 +- .../v0_8/transport/ConnectionStartBody.java | 8 +- .../v0_8/transport/ConnectionStartOkBody.java | 9 +- .../v0_8/transport/ExchangeDeclareBody.java | 4 +- .../protocol/v0_8/transport/QueueBindBody.java | 4 +- .../protocol/v0_8/transport/QueueDeclareBody.java | 4 +- .../protocol/v0_8/transport/QueueUnbindBody.java | 4 +- .../qpid/server/protocol/v0_8/AMQDecoderTest.java | 2 +- .../PropertyConverter_Internal_to_v0_8Test.java | 5 +- .../BasicContentHeaderPropertiesTest.java | 22 +- .../v0_8_v0_10/MessageConverter_0_8_to_0_10.java | 6 +- .../PropertyConverter_0_10_to_0_8Test.java | 7 +- .../v0_8_v1_0/MessageConverter_0_8_to_1_0.java | 3 +- .../PropertyConverter_1_0_to_0_8Test.java | 13 +- .../apache/qpid/tests/protocol/v0_8/BasicTest.java | 5 +- .../qpid/tests/protocol/v0_8/LargeHeadersTest.java | 4 +- .../v0_8/extension/basic/MalformedMessage.java | 2 +- .../protocol/v0_8/extension/queue/QueueTest.java | 3 +- 25 files changed, 527 insertions(+), 312 deletions(-) diff --git a/broker-core/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQType.java b/broker-core/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQType.java index 171f4d8..42dd0aa 100644 --- a/broker-core/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQType.java +++ b/broker-core/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQType.java @@ -304,15 +304,7 @@ public enum AMQType @Override public Object readValueFromBuffer(QpidByteBuffer buffer) { - try - { - // Read size of field table then all name/value pairs. - return EncodingUtils.readFieldTable(buffer); - } - catch (AMQFrameDecodingException e) - { - throw new IllegalArgumentException("Unable to read field table from buffer.", e); - } + return EncodingUtils.readFieldTable(buffer); } @Override diff --git a/broker-core/src/main/java/org/apache/qpid/server/protocol/v0_8/EncodingUtils.java b/broker-core/src/main/java/org/apache/qpid/server/protocol/v0_8/EncodingUtils.java index 5039552..0b9ff41 100644 --- a/broker-core/src/main/java/org/apache/qpid/server/protocol/v0_8/EncodingUtils.java +++ b/broker-core/src/main/java/org/apache/qpid/server/protocol/v0_8/EncodingUtils.java @@ -204,7 +204,7 @@ public class EncodingUtils } } - public static FieldTable readFieldTable(QpidByteBuffer input) throws AMQFrameDecodingException + public static FieldTable readFieldTable(QpidByteBuffer input) { long length = input.getUnsignedInt(); if (length == 0) diff --git a/broker-core/src/main/java/org/apache/qpid/server/protocol/v0_8/FieldTable.java b/broker-core/src/main/java/org/apache/qpid/server/protocol/v0_8/FieldTable.java index 3c14ebb..b5ad4db 100644 --- a/broker-core/src/main/java/org/apache/qpid/server/protocol/v0_8/FieldTable.java +++ b/broker-core/src/main/java/org/apache/qpid/server/protocol/v0_8/FieldTable.java @@ -20,6 +20,7 @@ */ package org.apache.qpid.server.protocol.v0_8; +import java.lang.ref.SoftReference; import java.math.BigDecimal; import java.nio.charset.StandardCharsets; import java.util.Arrays; @@ -32,6 +33,7 @@ import java.util.LinkedHashSet; import java.util.Map; import java.util.Set; import java.util.UUID; +import java.util.stream.Collectors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -41,130 +43,50 @@ import org.apache.qpid.server.bytebuffer.QpidByteBuffer; public class FieldTable { private static final Logger LOGGER = LoggerFactory.getLogger(FieldTable.class); - private static final String STRICT_AMQP_NAME = "STRICT_AMQP"; - private static final boolean STRICT_AMQP = Boolean.valueOf(System.getProperty(STRICT_AMQP_NAME, "false")); - private static final AMQTypedValue NOT_PRESENT = AMQType.VOID.asTypedValue(null); + private static final String STRICT_AMQP_NAME = "_strictAMQP"; + static boolean _strictAMQP = Boolean.valueOf(System.getProperty(STRICT_AMQP_NAME, "false")); - public static final FieldTable EMPTY = FieldTable.convertToFieldTable(Collections.emptyMap()); - - private QpidByteBuffer _encodedForm; - private boolean _decoded; - private final Map _properties; - private final long _encodedSize; - private final boolean _strictAMQP; + private final FieldTableSupport _fieldTableSupport; FieldTable(QpidByteBuffer input, int len) { - _strictAMQP = STRICT_AMQP; - _encodedForm = input.view(0,len); - input.position(input.position()+len); - _encodedSize = len; - _properties = new LinkedHashMap<>(); + final QpidByteBuffer encodedForm = input.view(0, len); + input.position(input.position() + len); + _fieldTableSupport = new ByteBufferFieldTableSupport(encodedForm); } FieldTable(QpidByteBuffer buffer) { - _strictAMQP = STRICT_AMQP; - _encodedForm = buffer.duplicate(); - _encodedSize = buffer.remaining(); - _properties = new LinkedHashMap<>(); + _fieldTableSupport = new ByteBufferFieldTableSupport(buffer.duplicate()); } FieldTable(Map properties) { - this(properties, STRICT_AMQP); - } - - FieldTable(Map properties, boolean strictAMQP) - { - _strictAMQP = strictAMQP; - long size = 0; - Map m = new LinkedHashMap<>(); + final Map m; if (properties != null && !properties.isEmpty()) { - m = new LinkedHashMap<>(); - for (Map.Entry e : properties.entrySet()) - { - String key = e.getKey(); - Object val = e.getValue(); - checkPropertyName(key); - AMQTypedValue value = getAMQTypeValue(val); - size += EncodingUtils.encodedShortStringLength(e.getKey()) + 1 + value.getEncodingSize(); - m.put(e.getKey(), value); - } + m = properties.entrySet() + .stream() + .peek(e -> checkPropertyName(e.getKey())) + .collect(Collectors.toMap(Map.Entry::getKey, + e -> getAMQTypeValue(e.getValue()), + (x, y) -> y, + LinkedHashMap::new)); } - _properties = m; - _encodedSize = size; - _decoded = true; - } - - private synchronized AMQTypedValue getProperty(String key) - { - AMQTypedValue value = _properties.get(key); - if (value == null && !_decoded) + else { - value = findValueForKey(key); - _properties.put(key, value); + m = Collections.emptyMap(); } - return value; - } - - private Map decode() - { - final Map properties = new HashMap<>(); - if (_encodedSize > 0 && _encodedForm != null) - { - _encodedForm.mark(); - try - { - do - { - final String key = AMQShortString.readAMQShortStringAsString(_encodedForm); - - checkPropertyName(key); - AMQTypedValue value = AMQTypedValue.readFromBuffer(_encodedForm); - properties.put(key, value); - } - while (_encodedForm.hasRemaining()); - } - finally - { - _encodedForm.reset(); - } - final long recalculateEncodedSize = recalculateEncodedSize(properties); - if (_encodedSize != recalculateEncodedSize) - { - throw new IllegalStateException(String.format( - "Malformed field table detected: provided encoded size '%d' does not equal calculated size '%d'", - _encodedSize, - recalculateEncodedSize)); - } - } - return properties; + _fieldTableSupport = new MapFieldTableSupport(m); } - private void decodeIfNecessary() + FieldTable(FieldTableSupport fieldTableSupport) { - if (!_decoded) - { - try - { - final Map properties = decode(); - if (!_properties.isEmpty()) - { - _properties.clear(); - } - _properties.putAll(properties); - } - finally - { - _decoded = true; - } - } + _fieldTableSupport = new MapFieldTableSupport(fieldTableSupport.getAsMap()); } - private AMQTypedValue getAMQTypeValue(final Object object) throws AMQPInvalidClassException + private static AMQTypedValue getAMQTypeValue(final Object object) throws AMQPInvalidClassException { if (object == null) { @@ -254,15 +176,13 @@ public class FieldTable throw new AMQPInvalidClassException(AMQPInvalidClassException.INVALID_OBJECT_MSG + object.getClass()); } - // ***** Methods - @Override public String toString() { - return getProperties().toString(); + return _fieldTableSupport.toString(); } - private void checkPropertyName(String propertyName) + private static void checkPropertyName(String propertyName) { if (propertyName == null) { @@ -307,47 +227,33 @@ public class FieldTable } } - // ************************* Byte Buffer Processing - - public synchronized void writeToBuffer(QpidByteBuffer buffer) + public void writeToBuffer(QpidByteBuffer buffer) { if (LOGGER.isDebugEnabled()) { LOGGER.debug("FieldTable::writeToBuffer: Writing encoded length of " + getEncodedSize() + "..."); - if (_decoded) + if (_fieldTableSupport instanceof MapFieldTableSupport) { - LOGGER.debug(getProperties().toString()); + LOGGER.debug(_fieldTableSupport.toString()); } } buffer.putUnsignedInt(getEncodedSize()); - putDataInBuffer(buffer); + _fieldTableSupport.writeToBuffer(buffer); } - public synchronized byte[] getDataAsBytes() + public byte[] getDataAsBytes() { - if (_encodedForm == null) - { - byte[] data = new byte[(int) getEncodedSize()]; - QpidByteBuffer buf = QpidByteBuffer.wrap(data); - putDataInBuffer(buf); - return data; - } - else - { - byte[] encodedCopy = new byte[_encodedForm.remaining()]; - _encodedForm.copyTo(encodedCopy); - return encodedCopy; - } + return _fieldTableSupport.getAsBytes(); } public long getEncodedSize() { - return _encodedSize; + return _fieldTableSupport.getEncodedSize(); } - private synchronized long recalculateEncodedSize(final Map properties) + private static long recalculateEncodedSize(final Map properties) { long size = 0L; for (Map.Entry e : properties.entrySet()) @@ -361,10 +267,9 @@ public class FieldTable public static Map convertToMap(final FieldTable fieldTable) { - final Map map = new HashMap<>(); - if (fieldTable != null) { + final Map map = new LinkedHashMap<>(); Map properties = fieldTable.getProperties(); if (properties != null) { @@ -382,34 +287,14 @@ public class FieldTable map.put(e.getKey(), val); } } + return map; } - return map; - } - - public synchronized void clearEncodedForm() - { - try - { - decodeIfNecessary(); - } - finally - { - if (_encodedForm != null) - { - _encodedForm.dispose(); - _encodedForm = null; - } - } + return Collections.emptyMap(); } - public synchronized void dispose() + public void dispose() { - if (_encodedForm != null) - { - _encodedForm.dispose(); - _encodedForm = null; - } - _properties.clear(); + _fieldTableSupport.dispose(); } public int size() @@ -419,12 +304,12 @@ public class FieldTable public boolean isEmpty() { - return size() == 0; + return getEncodedSize() > 0; } public boolean containsKey(String key) { - return getProperties().containsKey(key); + return _fieldTableSupport.containsKey(key); } public Set keys() @@ -435,10 +320,42 @@ public class FieldTable public Object get(String key) { checkPropertyName(key); - AMQTypedValue value = getProperty(key); - if (value != null && value != NOT_PRESENT) + return _fieldTableSupport.get(key); + } + + @Override + public boolean equals(final Object o) + { + if (this == o) { - return value.getValue(); + return true; + } + if (o == null || getClass() != o.getClass()) + { + return false; + } + + final FieldTable that = (FieldTable) o; + + return _fieldTableSupport.equals(that._fieldTableSupport); + } + + @Override + public int hashCode() + { + return _fieldTableSupport.hashCode(); + } + + private Map getProperties() + { + return _fieldTableSupport.getAsMap(); + } + + public static FieldTable convertToFieldTable(Map map) + { + if (map != null) + { + return new FieldTable(map); } else { @@ -446,99 +363,345 @@ public class FieldTable } } - private void putDataInBuffer(QpidByteBuffer buffer) + public static FieldTable convertToDecodedFieldTable(final FieldTable fieldTable) { - if (_encodedForm != null) + if (fieldTable == null) + { + return null; + } + + return new FieldTable(fieldTable._fieldTableSupport); + } + + public void validate() + { + _fieldTableSupport.validate(); + } + + interface FieldTableSupport + { + Object get(String key); + + boolean containsKey(String key); + + long getEncodedSize(); + + void writeToBuffer(QpidByteBuffer buffer); + + byte[] getAsBytes(); + + Map getAsMap(); + + void dispose(); + + void validate(); + } + + static class ByteBufferFieldTableSupport implements FieldTableSupport + { + private static final AMQTypedValue NOT_PRESENT = AMQType.VOID.asTypedValue(null); + + private final QpidByteBuffer _encodedForm; + private volatile SoftReference> _cache; + + ByteBufferFieldTableSupport(final QpidByteBuffer encodedForm) + { + _encodedForm = encodedForm; + _cache = new SoftReference<>(new LinkedHashMap<>()); + } + + @Override + public synchronized long getEncodedSize() + { + return _encodedForm.remaining(); + } + + @Override + public synchronized Object get(final String key) + { + final AMQTypedValue value = getValue(key); + if (value != null && value != NOT_PRESENT) + { + return value.getValue(); + } + else + { + return null; + } + } + + @Override + public boolean containsKey(final String key) + { + final AMQTypedValue value = getValue(key); + return value != null && value != NOT_PRESENT; + } + + @Override + public synchronized void writeToBuffer(final QpidByteBuffer buffer) { byte[] encodedCopy = new byte[_encodedForm.remaining()]; _encodedForm.copyTo(encodedCopy); - buffer.put(encodedCopy); } - else if (!_properties.isEmpty()) + + @Override + public synchronized byte[] getAsBytes() + { + byte[] encodedCopy = new byte[_encodedForm.remaining()]; + _encodedForm.copyTo(encodedCopy); + return encodedCopy; + } + + @Override + public Map getAsMap() { - for (final Map.Entry me : _properties.entrySet()) + return decode(); + } + + @Override + public synchronized void dispose() + { + if (_encodedForm != null) { - EncodingUtils.writeShortStringBytes(buffer, me.getKey()); - me.getValue().writeToBuffer(buffer); + _encodedForm.dispose(); + _cache.clear(); } } - } + @Override + public void validate() + { + decode(); + } - @Override - public int hashCode() - { - return getProperties().hashCode(); - } - - @Override - public boolean equals(Object o) - { - if (o == this) + @Override + public boolean equals(final Object o) { - return true; + if (this == o) + { + return true; + } + if (o == null || getClass() != o.getClass()) + { + return false; + } + + final ByteBufferFieldTableSupport that = (ByteBufferFieldTableSupport) o; + + return _encodedForm.equals(that._encodedForm); } - if (o == null || getClass() != o.getClass()) + @Override + public int hashCode() { - return false; + return _encodedForm.hashCode(); } - FieldTable f = (FieldTable) o; - return getProperties().equals(f.getProperties()); - } + @Override + public String toString() + { + return getAsMap().toString(); + } - private synchronized Map getProperties() - { - decodeIfNecessary(); - return _properties; - } - private AMQTypedValue findValueForKey(String key) - { - byte[] keyBytes = key.getBytes(StandardCharsets.UTF_8); - _encodedForm.mark(); - try + private synchronized AMQTypedValue getValue(final String key) { - while (_encodedForm.hasRemaining()) + AMQTypedValue value = null; + Map properties = _cache.get(); + if (properties == null) + { + _cache = new SoftReference<>(new LinkedHashMap<>()); + properties = _cache.get(); + } + if (properties != null) { - final byte[] bytes = AMQShortString.readAMQShortStringAsBytes(_encodedForm); - if (Arrays.equals(keyBytes, bytes)) + value = properties.get(key); + } + if (value == null) + { + value = findValueForKey(key); + if (value == null) { - return AMQTypedValue.readFromBuffer(_encodedForm); + value = NOT_PRESENT; } - else + if (properties != null) { - AMQType type = AMQTypeMap.getType(_encodedForm.get()); - type.skip(_encodedForm); + properties.put(key, value); } } + return value; } - finally + + private AMQTypedValue findValueForKey(String key) { - _encodedForm.reset(); + byte[] keyBytes = key.getBytes(StandardCharsets.UTF_8); + _encodedForm.mark(); + try + { + while (_encodedForm.hasRemaining()) + { + final byte[] bytes = AMQShortString.readAMQShortStringAsBytes(_encodedForm); + if (Arrays.equals(keyBytes, bytes)) + { + return AMQTypedValue.readFromBuffer(_encodedForm); + } + else + { + AMQType type = AMQTypeMap.getType(_encodedForm.get()); + type.skip(_encodedForm); + } + } + } + finally + { + _encodedForm.reset(); + } + return null; + } + + private synchronized Map decode() + { + final Map properties = new HashMap<>(); + final long encodedSize = getEncodedSize(); + if (encodedSize > 0) + { + _encodedForm.mark(); + try + { + do + { + final String key = AMQShortString.readAMQShortStringAsString(_encodedForm); + + checkPropertyName(key); + AMQTypedValue value = AMQTypedValue.readFromBuffer(_encodedForm); + properties.put(key, value); + } + while (_encodedForm.hasRemaining()); + } + finally + { + _encodedForm.reset(); + } + + final long recalculateEncodedSize = recalculateEncodedSize(properties); + if (encodedSize != recalculateEncodedSize) + { + throw new IllegalStateException(String.format( + "Malformed field table detected: provided encoded size '%d' does not equal calculated size '%d'", + encodedSize, + recalculateEncodedSize)); + } + } + return properties; } - return NOT_PRESENT; } - public static FieldTable convertToFieldTable(Map map) + static class MapFieldTableSupport implements FieldTableSupport { - if (map != null) + private final Map _properties; + private final long _encodedSize; + + MapFieldTableSupport(final Map properties) { - return new FieldTable(map); + _properties = Collections.unmodifiableMap(new LinkedHashMap<>(properties)); + _encodedSize = recalculateEncodedSize(properties); } - else + + @Override + public long getEncodedSize() { - return null; + return _encodedSize; } - } - public synchronized void validate() - { - if (!_decoded) + @Override + public Object get(final String key) { - decode(); + final AMQTypedValue value = _properties.get(key); + if (value == null) + { + return null; + } + return value.getValue(); + } + + @Override + public boolean containsKey(final String key) + { + return _properties.containsKey(key); + } + + @Override + public void writeToBuffer(final QpidByteBuffer buffer) + { + for (final Map.Entry me : _properties.entrySet()) + { + EncodingUtils.writeShortStringBytes(buffer, me.getKey()); + me.getValue().writeToBuffer(buffer); + } + } + + @Override + public byte[] getAsBytes() + { + byte[] data = new byte[(int) getEncodedSize()]; + QpidByteBuffer buf = QpidByteBuffer.wrap(data); + writeToBuffer(buf); + return data; + } + + @Override + public Map getAsMap() + { + return _properties; + } + + @Override + public void dispose() + { + // noop + } + + @Override + public void validate() + { + // noop + } + + @Override + public boolean equals(final Object o) + { + if (this == o) + { + return true; + } + if (o == null || getClass() != o.getClass()) + { + return false; + } + + final MapFieldTableSupport that = (MapFieldTableSupport) o; + + if (_encodedSize != that._encodedSize) + { + return false; + } + return _properties.equals(that._properties); + } + + @Override + public int hashCode() + { + int result = _properties.hashCode(); + result = 31 * result + (int) (_encodedSize ^ (_encodedSize >>> 32)); + return result; + } + + @Override + public String toString() + { + return _properties.toString(); } } + } diff --git a/broker-core/src/test/java/org/apache/qpid/server/protocol/v0_8/FieldTableTest.java b/broker-core/src/test/java/org/apache/qpid/server/protocol/v0_8/FieldTableTest.java index 4a841eb..ecfa8d2 100644 --- a/broker-core/src/test/java/org/apache/qpid/server/protocol/v0_8/FieldTableTest.java +++ b/broker-core/src/test/java/org/apache/qpid/server/protocol/v0_8/FieldTableTest.java @@ -417,7 +417,7 @@ public class FieldTableTest extends UnitTestBase * Additional test checkPropertyName doesn't accept Null */ @Test - public void testCheckPropertyNameaIsNull() + public void testCheckPropertyNameIsNull() { try { @@ -434,7 +434,7 @@ public class FieldTableTest extends UnitTestBase * Additional test checkPropertyName doesn't accept an empty String */ @Test - public void testCheckPropertyNameaIsEmptyString() + public void testCheckPropertyNameIsEmptyString() { try { @@ -460,15 +460,21 @@ public class FieldTableTest extends UnitTestBase longPropertyName.append("x"); } + boolean strictAMQP = FieldTable._strictAMQP; try { - new FieldTable(Collections.singletonMap(longPropertyName.toString(), "String"), true); + FieldTable._strictAMQP = true; + new FieldTable(Collections.singletonMap(longPropertyName.toString(), "String")); fail("property name must be < 128 characters"); } catch (IllegalArgumentException iae) { // normal path } + finally + { + FieldTable._strictAMQP = strictAMQP; + } } /** @@ -477,16 +483,23 @@ public class FieldTableTest extends UnitTestBase @Test public void testCheckPropertyNameStartCharacterIsLetter() { + boolean strictAMQP = FieldTable._strictAMQP; + // Try a name that starts with a number try { - new FieldTable(Collections.singletonMap("1", "String"), true); + FieldTable._strictAMQP = true; + new FieldTable(Collections.singletonMap("1", "String")); fail("property name must start with a letter"); } catch (IllegalArgumentException iae) { // normal path } + finally + { + FieldTable._strictAMQP = strictAMQP; + } } /** diff --git a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java index 88e879c..869fd9b 100644 --- a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java +++ b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java @@ -286,17 +286,16 @@ public class MessageMetaData implements StorableMessageMetaData @Override public Object getHeader(String name) { - FieldTable ft = getProperties().getHeaders(); - return ft.get(name); + return getProperties().getHeader(name); } @Override public boolean containsHeaders(Set names) { - FieldTable ft = getProperties().getHeaders(); + final BasicContentHeaderProperties properties = getProperties(); for(String name : names) { - if(!ft.containsKey(name)) + if(!properties.containsHeader(name)) { return false; } @@ -307,14 +306,13 @@ public class MessageMetaData implements StorableMessageMetaData @Override public Collection getHeaderNames() { - return getProperties().getHeaders().keys(); + return getProperties().getHeaderNames(); } @Override public boolean containsHeader(String name) { - FieldTable ft = getProperties().getHeaders(); - return ft.containsKey(name); + return getProperties().containsHeader(name); } diff --git a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/transport/BasicConsumeBody.java b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/transport/BasicConsumeBody.java index 4249e51..a8dfa27 100644 --- a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/transport/BasicConsumeBody.java +++ b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/transport/BasicConsumeBody.java @@ -203,13 +203,19 @@ public class BasicConsumeBody extends AMQMethodBodyImpl implements EncodableAMQD boolean exclusive = (bitfield & 0x04) == 0x04; boolean nowait = (bitfield & 0x08) == 0x08; FieldTable arguments = EncodingUtils.readFieldTable(buffer); - if(!dispatcher.ignoreAllButCloseOk()) + if (!dispatcher.ignoreAllButCloseOk()) { - dispatcher.receiveBasicConsume(queue, consumerTag, noLocal, noAck, exclusive, nowait, arguments); + dispatcher.receiveBasicConsume(queue, + consumerTag, + noLocal, + noAck, + exclusive, + nowait, + FieldTable.convertToDecodedFieldTable(arguments)); } if (arguments != null) { - arguments.clearEncodedForm(); + arguments.dispose(); } } } diff --git a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/transport/BasicContentHeaderProperties.java b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/transport/BasicContentHeaderProperties.java index 2a591ef..75efa7a 100644 --- a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/transport/BasicContentHeaderProperties.java +++ b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/transport/BasicContentHeaderProperties.java @@ -20,6 +20,9 @@ */ package org.apache.qpid.server.protocol.v0_8.transport; +import java.util.Collection; +import java.util.Map; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -45,7 +48,7 @@ public class BasicContentHeaderProperties private AMQShortString _encoding; - private FieldTable _headers = FieldTable.EMPTY; + private FieldTable _headers; private byte _deliveryMode; @@ -85,12 +88,12 @@ public class BasicContentHeaderProperties private static final int APPLICATION_ID_MASK = 1 << 3; private static final int CLUSTER_ID_MASK = 1 << 2; - private QpidByteBuffer _encodedForm; + private volatile QpidByteBuffer _encodedForm; public BasicContentHeaderProperties(BasicContentHeaderProperties other) { - _headers = FieldTableFactory.createFieldTable(FieldTable.convertToMap(other.getHeaders())); + _headers = FieldTableFactory.createFieldTable(other.getHeadersAsMap()); _contentType = other._contentType; @@ -375,13 +378,7 @@ public class BasicContentHeaderProperties if ((_propertyFlags & HEADERS_MASK) != 0) { - long length = buffer.getUnsignedInt(); - - try (QpidByteBuffer buf = buffer.view(0, (int) length)) - { - _headers = FieldTableFactory.createFieldTable(buf); - } - buffer.position(buffer.position()+(int)length); + _headers = EncodingUtils.readFieldTable(buffer); } if ((_propertyFlags & DELIVERY_MODE_MASK) != 0) @@ -502,9 +499,24 @@ public class BasicContentHeaderProperties nullEncodedForm(); } - public FieldTable getHeaders() + public synchronized Object getHeader(String name) + { + return _headers == null ? null : _headers.get(name); + } + + public synchronized Collection getHeaderNames() + { + return getHeadersAsMap().keySet(); + } + + public synchronized boolean containsHeader(String name) { - return _headers; + return _headers != null && _headers.containsKey(name); + } + + public synchronized Map getHeadersAsMap() + { + return FieldTable.convertToMap(_headers); } public synchronized void setHeaders(FieldTable headers) @@ -517,7 +529,7 @@ public class BasicContentHeaderProperties { _propertyFlags |= HEADERS_MASK; } - _headers = headers == null ? FieldTable.EMPTY : headers; + _headers = headers; nullEncodedForm(); } @@ -809,13 +821,21 @@ public class BasicContentHeaderProperties public synchronized void dispose() { nullEncodedForm(); - _headers.dispose(); + if (_headers != null) + { + _headers.dispose(); + } } public synchronized void clearEncodedForm() { nullEncodedForm(); - _headers.clearEncodedForm(); + if (_headers != null) + { + final FieldTable headers = FieldTable.convertToDecodedFieldTable(_headers); + _headers.dispose(); + _headers = headers; + } } private synchronized void nullEncodedForm() @@ -832,13 +852,45 @@ public class BasicContentHeaderProperties if (_encodedForm != null) { _encodedForm = QpidByteBuffer.reallocateIfNecessary(_encodedForm); + + if (_headers != null) + { + _headers.dispose(); + _headers = null; + } + rebuildHeadersIfHeadersMaskIsSet(); + } + } + + private void rebuildHeadersIfHeadersMaskIsSet() + { + try (QpidByteBuffer byteBuffer = _encodedForm.slice()) + { + if ((_propertyFlags & (CONTENT_TYPE_MASK)) != 0) + { + int contentTypeSize = byteBuffer.getUnsignedByte(); + byteBuffer.position(byteBuffer.position() + contentTypeSize); + } + + if ((_propertyFlags & ENCODING_MASK) != 0) + { + int encodingSize = byteBuffer.getUnsignedByte(); + byteBuffer.position(byteBuffer.position() + encodingSize); + } + + if ((_propertyFlags & HEADERS_MASK) != 0) + { + _headers = EncodingUtils.readFieldTable(byteBuffer); + } } - _headers.clearEncodedForm(); } public synchronized void validate() { - _headers.validate(); + if (_headers != null) + { + _headers.validate(); + } } public boolean checkValid() diff --git a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/transport/ChannelAlertBody.java b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/transport/ChannelAlertBody.java index 91dfc69..0964d0e 100644 --- a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/transport/ChannelAlertBody.java +++ b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/transport/ChannelAlertBody.java @@ -130,11 +130,11 @@ public class ChannelAlertBody extends AMQMethodBodyImpl implements EncodableAMQD FieldTable details = EncodingUtils.readFieldTable(buffer); if(!dispatcher.ignoreAllButCloseOk()) { - dispatcher.receiveChannelAlert(replyCode, replyText, details); + dispatcher.receiveChannelAlert(replyCode, replyText, FieldTable.convertToDecodedFieldTable(details)); } if (details != null) { - details.clearEncodedForm(); + details.dispose(); } } } diff --git a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/transport/ConnectionStartBody.java b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/transport/ConnectionStartBody.java index 7e3010b..a5ad772 100644 --- a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/transport/ConnectionStartBody.java +++ b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/transport/ConnectionStartBody.java @@ -152,11 +152,15 @@ public class ConnectionStartBody extends AMQMethodBodyImpl implements EncodableA if(!dispatcher.ignoreAllButCloseOk()) { - dispatcher.receiveConnectionStart(versionMajor, versionMinor, serverProperties, mechanisms, locales); + dispatcher.receiveConnectionStart(versionMajor, + versionMinor, + FieldTable.convertToDecodedFieldTable(serverProperties), + mechanisms, + locales); } if (serverProperties != null) { - serverProperties.clearEncodedForm(); + serverProperties.dispose(); } } } diff --git a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/transport/ConnectionStartOkBody.java b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/transport/ConnectionStartOkBody.java index 92e51e7..2a6b7fb 100644 --- a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/transport/ConnectionStartOkBody.java +++ b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/transport/ConnectionStartOkBody.java @@ -141,13 +141,16 @@ public class ConnectionStartOkBody extends AMQMethodBodyImpl implements Encodabl AMQShortString mechanism = AMQShortString.readAMQShortString(in); byte[] response = EncodingUtils.readBytes(in); AMQShortString locale = AMQShortString.readAMQShortString(in); - if(!dispatcher.ignoreAllButCloseOk()) + if (!dispatcher.ignoreAllButCloseOk()) { - dispatcher.receiveConnectionStartOk(clientProperties, mechanism, response, locale); + dispatcher.receiveConnectionStartOk(FieldTable.convertToDecodedFieldTable(clientProperties), + mechanism, + response, + locale); } if (clientProperties != null) { - clientProperties.clearEncodedForm(); + clientProperties.dispose(); } } } diff --git a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/transport/ExchangeDeclareBody.java b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/transport/ExchangeDeclareBody.java index b940a26..8f3265d 100644 --- a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/transport/ExchangeDeclareBody.java +++ b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/transport/ExchangeDeclareBody.java @@ -224,11 +224,11 @@ public class ExchangeDeclareBody extends AMQMethodBodyImpl implements EncodableA autoDelete, internal, nowait, - arguments); + FieldTable.convertToDecodedFieldTable(arguments)); } if (arguments != null) { - arguments.clearEncodedForm(); + arguments.dispose(); } } } diff --git a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/transport/QueueBindBody.java b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/transport/QueueBindBody.java index 3d3cc90..bafcf9c 100644 --- a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/transport/QueueBindBody.java +++ b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/transport/QueueBindBody.java @@ -173,11 +173,11 @@ public class QueueBindBody extends AMQMethodBodyImpl implements EncodableAMQData FieldTable arguments = EncodingUtils.readFieldTable(buffer); if(!dispatcher.ignoreAllButCloseOk()) { - dispatcher.receiveQueueBind(queue, exchange, bindingKey, nowait, arguments); + dispatcher.receiveQueueBind(queue, exchange, bindingKey, nowait, FieldTable.convertToDecodedFieldTable(arguments)); } if (arguments != null) { - arguments.clearEncodedForm(); + arguments.dispose(); } } } diff --git a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/transport/QueueDeclareBody.java b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/transport/QueueDeclareBody.java index 3a9a966..9df4634 100644 --- a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/transport/QueueDeclareBody.java +++ b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/transport/QueueDeclareBody.java @@ -205,11 +205,11 @@ public class QueueDeclareBody extends AMQMethodBodyImpl implements EncodableAMQD FieldTable arguments = EncodingUtils.readFieldTable(buffer); if(!dispatcher.ignoreAllButCloseOk()) { - dispatcher.receiveQueueDeclare(queue, passive, durable, exclusive, autoDelete, nowait, arguments); + dispatcher.receiveQueueDeclare(queue, passive, durable, exclusive, autoDelete, nowait, FieldTable.convertToDecodedFieldTable(arguments)); } if (arguments != null) { - arguments.clearEncodedForm(); + arguments.dispose(); } } } diff --git a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/transport/QueueUnbindBody.java b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/transport/QueueUnbindBody.java index c45c477..0cba587 100644 --- a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/transport/QueueUnbindBody.java +++ b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/transport/QueueUnbindBody.java @@ -155,11 +155,11 @@ public class QueueUnbindBody extends AMQMethodBodyImpl implements EncodableAMQDa FieldTable arguments = EncodingUtils.readFieldTable(buffer); if(!dispatcher.ignoreAllButCloseOk()) { - dispatcher.receiveQueueUnbind(queue, exchange, routingKey, arguments); + dispatcher.receiveQueueUnbind(queue, exchange, routingKey, FieldTable.convertToDecodedFieldTable(arguments)); } if (arguments != null) { - arguments.clearEncodedForm(); + arguments.dispose(); } } diff --git a/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQDecoderTest.java b/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQDecoderTest.java index 5161bb3..a69975e 100644 --- a/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQDecoderTest.java +++ b/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQDecoderTest.java @@ -117,7 +117,7 @@ public class AMQDecoderTest extends UnitTestBase assertEquals((long) ContentHeaderBody.TYPE, (long) ((AMQFrame) firstFrame).getBodyFrame().getFrameType()); BasicContentHeaderProperties decodedProps = ((ContentHeaderBody)((AMQFrame)firstFrame).getBodyFrame()).getProperties(); - final FieldTable headers = decodedProps.getHeaders(); + final Map headers = decodedProps.getHeadersAsMap(); assertEquals("world", headers.get("hello")); } else diff --git a/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/PropertyConverter_Internal_to_v0_8Test.java b/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/PropertyConverter_Internal_to_v0_8Test.java index 9adb968..b215864 100644 --- a/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/PropertyConverter_Internal_to_v0_8Test.java +++ b/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/PropertyConverter_Internal_to_v0_8Test.java @@ -117,7 +117,7 @@ public class PropertyConverter_Internal_to_v0_8Test extends UnitTestBase } @Test - public void testExpirationConversion() throws InterruptedException + public void testExpirationConversion() { long ttl = 10000; long expiryTime = System.currentTimeMillis() + ttl; @@ -276,8 +276,7 @@ public class PropertyConverter_Internal_to_v0_8Test extends UnitTestBase final AMQMessage convertedMessage = _messageConverter.convert(originalMessage, _addressSpace); - Map convertedHeaders = - FieldTable.convertToMap(convertedMessage.getContentHeaderBody().getProperties().getHeaders()); + Map convertedHeaders = convertedMessage.getContentHeaderBody().getProperties().getHeadersAsMap(); assertEquals("Unexpected application properties", properties, new HashMap<>(convertedHeaders)); } diff --git a/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/transport/BasicContentHeaderPropertiesTest.java b/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/transport/BasicContentHeaderPropertiesTest.java index 904bb72..ba25df7 100644 --- a/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/transport/BasicContentHeaderPropertiesTest.java +++ b/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/transport/BasicContentHeaderPropertiesTest.java @@ -41,7 +41,6 @@ public class BasicContentHeaderPropertiesTest extends UnitTestBase private BasicContentHeaderProperties _testProperties; private FieldTable _testTable; private String _testString = "This is a test string"; - private int _testint = 666; /** * Currently only test setting/getting String, int and boolean props @@ -56,7 +55,7 @@ public class BasicContentHeaderPropertiesTest extends UnitTestBase { Map headers = new LinkedHashMap<>(); headers.put("TestString", _testString); - headers.put("Testint", _testint); + headers.put("Testint", Integer.MAX_VALUE); _testTable = FieldTableFactory.createFieldTable(headers); _testProperties = new BasicContentHeaderProperties(); @@ -119,7 +118,7 @@ public class BasicContentHeaderPropertiesTest extends UnitTestBase public void testSetGetHeaders() { _testProperties.setHeaders(_testTable); - assertEquals(_testTable, _testProperties.getHeaders()); + assertEquals(FieldTable.convertToMap(_testTable), _testProperties.getHeadersAsMap()); } @Test @@ -218,7 +217,7 @@ public class BasicContentHeaderPropertiesTest extends UnitTestBase private static final double SPARSITY_FRACTION = 0.5; @Test - public void testRellocate() throws Exception + public void testReallocate() throws Exception { try { @@ -229,7 +228,7 @@ public class BasicContentHeaderPropertiesTest extends UnitTestBase // set some test fields _testProperties.setContentType("text/plain"); _testProperties.setUserId("test"); - final Map headers = FieldTable.convertToMap(_testProperties.getHeaders()); + final Map headers = _testProperties.getHeadersAsMap(); final int propertyListSize = _testProperties.getPropertyListSize(); final int flags = _testProperties.getPropertyFlags(); @@ -243,21 +242,16 @@ public class BasicContentHeaderPropertiesTest extends UnitTestBase propertiesBuffer.flip(); BasicContentHeaderProperties testProperties = new BasicContentHeaderProperties(propertiesBuffer, flags, propertyListSize); - FieldTable headersBeforeReallocation = testProperties.getHeaders(); - assertEquals("Unexpected headers", - headers, - FieldTable.convertToMap(headersBeforeReallocation)); + final Map headersBeforeReallocation = testProperties.getHeadersAsMap(); + assertEquals("Unexpected headers", headers, headersBeforeReallocation); buffer.dispose(); assertTrue("Properties buffer should be sparse", propertiesBuffer.isSparse()); testProperties.reallocate(); - FieldTable headersAfterReallocation = testProperties.getHeaders(); - - assertEquals("Unexpected headers after re-allocation", - headers, - FieldTable.convertToMap(headersAfterReallocation)); + final Map headersAfterReallocation = testProperties.getHeadersAsMap(); + assertEquals("Unexpected headers after re-allocation", headers, headersAfterReallocation); } } } diff --git a/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java b/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java index 68c96c9..ae54d40 100644 --- a/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java +++ b/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java @@ -23,6 +23,7 @@ package org.apache.qpid.server.protocol.converter.v0_8_v0_10; import static java.nio.charset.StandardCharsets.UTF_8; import java.net.URISyntaxException; +import java.util.LinkedHashMap; import java.util.Map; import java.util.UUID; @@ -40,7 +41,6 @@ import org.apache.qpid.server.protocol.v0_10.transport.MessageProperties; import org.apache.qpid.server.protocol.v0_10.transport.ReplyTo; import org.apache.qpid.server.protocol.v0_8.AMQMessage; import org.apache.qpid.server.protocol.v0_8.AMQShortString; -import org.apache.qpid.server.protocol.v0_8.FieldTable; import org.apache.qpid.server.protocol.v0_8.transport.BasicContentHeaderProperties; import org.apache.qpid.server.store.StoredMessage; import org.apache.qpid.server.url.AMQBindingURL; @@ -242,9 +242,7 @@ public class MessageConverter_0_8_to_0_10 implements MessageConverter appHeaders = FieldTable.convertToMap(fieldTable); + final Map appHeaders = new LinkedHashMap<>(properties.getHeadersAsMap()); if(properties.getType() != null) { diff --git a/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/test/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/PropertyConverter_0_10_to_0_8Test.java b/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/test/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/PropertyConverter_0_10_to_0_8Test.java index a25398a..f94d9cd 100644 --- a/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/test/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/PropertyConverter_0_10_to_0_8Test.java +++ b/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/test/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/PropertyConverter_0_10_to_0_8Test.java @@ -50,7 +50,6 @@ import org.apache.qpid.server.protocol.v0_10.transport.MessageProperties; import org.apache.qpid.server.protocol.v0_10.transport.ReplyTo; import org.apache.qpid.server.protocol.v0_8.AMQMessage; import org.apache.qpid.server.protocol.v0_8.AMQShortString; -import org.apache.qpid.server.protocol.v0_8.FieldTable; import org.apache.qpid.server.protocol.v0_8.transport.BasicContentHeaderProperties; import org.apache.qpid.server.protocol.v0_8.transport.MessagePublishInfo; import org.apache.qpid.server.store.StoredMessage; @@ -110,7 +109,7 @@ public class PropertyConverter_0_10_to_0_8Test extends UnitTestBase final AMQMessage convertedMessage = _messageConverter.convert(message, _namedAddressSpace); BasicContentHeaderProperties properties = convertedMessage.getContentHeaderBody().getProperties(); - Map applicationProperties = FieldTable.convertToMap(properties.getHeaders()); + Map applicationProperties = properties.getHeadersAsMap(); assertEquals("Unexpected headers", headers, new HashMap<>(applicationProperties)); } @@ -127,7 +126,7 @@ public class PropertyConverter_0_10_to_0_8Test extends UnitTestBase final AMQMessage convertedMessage = _messageConverter.convert(message, _namedAddressSpace); BasicContentHeaderProperties properties = convertedMessage.getContentHeaderBody().getProperties(); - Map applicationProperties = FieldTable.convertToMap(properties.getHeaders()); + Map applicationProperties = properties.getHeadersAsMap(); assertEquals("Unexpected subject in application properties", testSubject, applicationProperties.get("qpid.subject")); @@ -408,7 +407,7 @@ public class PropertyConverter_0_10_to_0_8Test extends UnitTestBase BasicContentHeaderProperties properties = convertedMessage.getContentHeaderBody().getProperties(); assertEquals("Unexpected subject", type, properties.getType().toString()); - Map applicationProperties = FieldTable.convertToMap(properties.getHeaders()); + Map applicationProperties = properties.getHeadersAsMap(); assertFalse("Unexpected x-jms-type in application properties", applicationProperties.containsKey("x-jms-type")); diff --git a/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_0_8_to_1_0.java b/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_0_8_to_1_0.java index 27b31b8..4015a3a 100644 --- a/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_0_8_to_1_0.java +++ b/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_0_8_to_1_0.java @@ -32,7 +32,6 @@ import org.apache.qpid.server.plugin.PluggableService; import org.apache.qpid.server.protocol.converter.MessageConversionException; import org.apache.qpid.server.protocol.v0_8.AMQMessage; import org.apache.qpid.server.protocol.v0_8.AMQShortString; -import org.apache.qpid.server.protocol.v0_8.FieldTable; import org.apache.qpid.server.protocol.v0_8.transport.BasicContentHeaderProperties; import org.apache.qpid.server.protocol.v0_8.transport.MessagePublishInfo; import org.apache.qpid.server.protocol.v1_0.MessageConverter_to_1_0; @@ -133,7 +132,7 @@ public class MessageConverter_0_8_to_1_0 extends MessageConverter_to_1_0 applicationPropertiesMap = new LinkedHashMap<>(FieldTable.convertToMap(contentHeader.getHeaders())); + Map applicationPropertiesMap = new LinkedHashMap<>(contentHeader.getHeadersAsMap()); if(applicationPropertiesMap.containsKey("qpid.subject")) { diff --git a/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/test/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/PropertyConverter_1_0_to_0_8Test.java b/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/test/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/PropertyConverter_1_0_to_0_8Test.java index 99a7015..1697004 100644 --- a/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/test/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/PropertyConverter_1_0_to_0_8Test.java +++ b/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/test/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/PropertyConverter_1_0_to_0_8Test.java @@ -48,7 +48,6 @@ import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.protocol.converter.MessageConversionException; import org.apache.qpid.server.protocol.v0_8.AMQMessage; import org.apache.qpid.server.protocol.v0_8.AMQShortString; -import org.apache.qpid.server.protocol.v0_8.FieldTable; import org.apache.qpid.server.protocol.v0_8.transport.BasicContentHeaderProperties; import org.apache.qpid.server.protocol.v0_8.transport.MessagePublishInfo; import org.apache.qpid.server.protocol.v1_0.MessageMetaData_1_0; @@ -130,7 +129,7 @@ public class PropertyConverter_1_0_to_0_8Test extends UnitTestBase final AMQMessage convertedMessage = _messageConverter.convert(message, _namedAddressSpace); BasicContentHeaderProperties convertedProperties = convertedMessage.getContentHeaderBody().getProperties(); - final Map headers = FieldTable.convertToMap(convertedProperties.getHeaders()); + final Map headers = convertedProperties.getHeadersAsMap(); assertEquals("Unexpected headers", properties, new HashMap<>(headers)); } @@ -146,7 +145,7 @@ public class PropertyConverter_1_0_to_0_8Test extends UnitTestBase final AMQMessage convertedMessage = _messageConverter.convert(message, _namedAddressSpace); BasicContentHeaderProperties convertedProperties = convertedMessage.getContentHeaderBody().getProperties(); - final Map headers = FieldTable.convertToMap(convertedProperties.getHeaders()); + final Map headers = convertedProperties.getHeadersAsMap(); assertEquals("Unexpected headers size", (long) properties.size(), (long) headers.size()); assertEquals("Unexpected headers", properties.get(key), UUID.fromString((String) headers.get(key))); } @@ -163,7 +162,7 @@ public class PropertyConverter_1_0_to_0_8Test extends UnitTestBase final AMQMessage convertedMessage = _messageConverter.convert(message, _namedAddressSpace); BasicContentHeaderProperties convertedProperties = convertedMessage.getContentHeaderBody().getProperties(); - final Map headers = FieldTable.convertToMap(convertedProperties.getHeaders()); + final Map headers = convertedProperties.getHeadersAsMap(); assertEquals("Unexpected headers size", (long) properties.size(), (long) headers.size()); assertEquals("Unexpected headers", properties.get(key), new Date((Long) headers.get(key))); } @@ -179,7 +178,7 @@ public class PropertyConverter_1_0_to_0_8Test extends UnitTestBase final AMQMessage convertedMessage = _messageConverter.convert(message, _namedAddressSpace); BasicContentHeaderProperties convertedProperties = convertedMessage.getContentHeaderBody().getProperties(); - final Map headers = FieldTable.convertToMap(convertedProperties.getHeaders()); + final Map headers = convertedProperties.getHeadersAsMap(); assertEquals("Unexpected qpid.subject is missing from headers", subject, headers.get("qpid.subject")); assertEquals("Unexpected type", subject, convertedProperties.getType().toString()); final MessagePublishInfo messagePublishInfo = convertedMessage.getMessagePublishInfo(); @@ -724,7 +723,7 @@ public class PropertyConverter_1_0_to_0_8Test extends UnitTestBase final AMQMessage convertedMessage = _messageConverter.convert(message, _namedAddressSpace); BasicContentHeaderProperties convertedProperties = convertedMessage.getContentHeaderBody().getProperties(); - Map headers = FieldTable.convertToMap(convertedProperties.getHeaders()); + Map headers = convertedProperties.getHeadersAsMap(); assertEquals("Unexpected group-id", testGroupId, headers.get("JMSXGroupID")); } @@ -739,7 +738,7 @@ public class PropertyConverter_1_0_to_0_8Test extends UnitTestBase final AMQMessage convertedMessage = _messageConverter.convert(message, _namedAddressSpace); BasicContentHeaderProperties convertedProperties = convertedMessage.getContentHeaderBody().getProperties(); - Map headers = FieldTable.convertToMap(convertedProperties.getHeaders()); + Map headers = convertedProperties.getHeadersAsMap(); assertEquals("Unexpected group-id", testGroupSequence, headers.get("JMSXGroupSeq")); } diff --git a/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/BasicTest.java b/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/BasicTest.java index f497f05..993209d 100644 --- a/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/BasicTest.java +++ b/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/BasicTest.java @@ -41,7 +41,6 @@ import org.apache.qpid.server.bytebuffer.QpidByteBuffer; import org.apache.qpid.server.exchange.ExchangeDefaults; import org.apache.qpid.server.protocol.ErrorCodes; import org.apache.qpid.server.protocol.v0_8.AMQShortString; -import org.apache.qpid.server.protocol.v0_8.FieldTable; import org.apache.qpid.server.protocol.v0_8.transport.BasicConsumeOkBody; import org.apache.qpid.server.protocol.v0_8.transport.BasicContentHeaderProperties; import org.apache.qpid.server.protocol.v0_8.transport.BasicDeliverBody; @@ -220,7 +219,7 @@ public class BasicTest extends BrokerAdminUsingTestBase assertThat(receivedContent, is(equalTo(messageContent))); - Map receivedHeaders = new HashMap<>(FieldTable.convertToMap(properties.getHeaders())); + Map receivedHeaders = new HashMap<>(properties.getHeadersAsMap()); assertThat(receivedHeaders, is(equalTo(new HashMap<>(messageHeaders)))); assertThat(properties.getContentTypeAsString(), is(equalTo(messageContentType))); assertThat(properties.getDeliveryMode(), is(equalTo(deliveryMode))); @@ -279,7 +278,7 @@ public class BasicTest extends BrokerAdminUsingTestBase assertThat(header.getBodySize(), is(equalTo((long)messageContent.length()))); BasicContentHeaderProperties properties = header.getProperties(); - Map receivedHeaders = new HashMap<>(FieldTable.convertToMap(properties.getHeaders())); + Map receivedHeaders = new HashMap<>(properties.getHeadersAsMap()); assertThat(receivedHeaders, is(equalTo(new HashMap<>(messageHeaders)))); assertThat(properties.getContentTypeAsString(), is(equalTo(messageContentType))); assertThat(properties.getPriority(), is(equalTo(priority))); diff --git a/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/LargeHeadersTest.java b/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/LargeHeadersTest.java index cb01027..27bda7f 100644 --- a/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/LargeHeadersTest.java +++ b/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/LargeHeadersTest.java @@ -22,7 +22,6 @@ package org.apache.qpid.tests.protocol.v0_8; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.is; -import static org.hamcrest.CoreMatchers.notNullValue; import static org.hamcrest.MatcherAssert.assertThat; import java.net.InetSocketAddress; @@ -34,7 +33,6 @@ import org.junit.Before; import org.junit.Test; import org.apache.qpid.server.protocol.v0_8.AMQShortString; -import org.apache.qpid.server.protocol.v0_8.FieldTable; import org.apache.qpid.server.protocol.v0_8.transport.BasicConsumeOkBody; import org.apache.qpid.server.protocol.v0_8.transport.BasicContentHeaderProperties; import org.apache.qpid.server.protocol.v0_8.transport.BasicDeliverBody; @@ -111,7 +109,7 @@ public class LargeHeadersTest extends BrokerAdminUsingTestBase assertThat(header.getBodySize(), is(equalTo(0L))); BasicContentHeaderProperties properties = header.getProperties(); - Map receivedHeaders = new HashMap<>(FieldTable.convertToMap(properties.getHeaders())); + Map receivedHeaders = new HashMap<>(properties.getHeadersAsMap()); assertThat(receivedHeaders, is(equalTo(new HashMap<>(messageHeaders)))); } } diff --git a/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/extension/basic/MalformedMessage.java b/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/extension/basic/MalformedMessage.java index ccea026..a7a38b8 100644 --- a/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/extension/basic/MalformedMessage.java +++ b/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/extension/basic/MalformedMessage.java @@ -212,7 +212,7 @@ public class MalformedMessage extends BrokerAdminUsingTestBase assertThat(header.getBodySize(), is(equalTo((long) content2Bytes.length))); BasicContentHeaderProperties properties = header.getProperties(); - Map receivedHeaders = new HashMap<>(FieldTable.convertToMap(properties.getHeaders())); + Map receivedHeaders = new HashMap<>(properties.getHeadersAsMap()); assertThat(receivedHeaders.isEmpty(), is(equalTo(true))); String receivedContent = diff --git a/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/extension/queue/QueueTest.java b/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/extension/queue/QueueTest.java index 4bd9b5a..0e66f35 100644 --- a/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/extension/queue/QueueTest.java +++ b/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/extension/queue/QueueTest.java @@ -39,7 +39,6 @@ import org.junit.Test; import org.apache.qpid.server.bytebuffer.QpidByteBuffer; import org.apache.qpid.server.protocol.ErrorCodes; import org.apache.qpid.server.protocol.v0_8.AMQShortString; -import org.apache.qpid.server.protocol.v0_8.FieldTable; import org.apache.qpid.server.protocol.v0_8.transport.BasicConsumeOkBody; import org.apache.qpid.server.protocol.v0_8.transport.BasicContentHeaderProperties; import org.apache.qpid.server.protocol.v0_8.transport.BasicDeliverBody; @@ -137,7 +136,7 @@ public class QueueTest extends BrokerAdminUsingTestBase assertThat(header.getBodySize(), is(equalTo((long) content2.length()))); BasicContentHeaderProperties properties = header.getProperties(); - Map receivedHeaders = new HashMap<>(FieldTable.convertToMap(properties.getHeaders())); + Map receivedHeaders = new HashMap<>(properties.getHeadersAsMap()); assertThat(receivedHeaders, is(equalTo(new HashMap<>(messageHeaders2)))); assertThat(properties.getContentTypeAsString(), is(equalTo(contentType))); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org For additional commands, e-mail: commits-help@qpid.apache.org