qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From oru...@apache.org
Subject [2/6] qpid-broker-j git commit: QPID-7830: [Broker-J] [AMQP 0-8..0-91] Cache AMQPShortStrings that relate to exchanges/routing keys and header values (that are usually drawn from a small domain) in a time/size bound cache.
Date Fri, 11 May 2018 16:31:57 GMT
QPID-7830: [Broker-J] [AMQP 0-8..0-91] Cache AMQPShortStrings that relate to exchanges/routing
keys and header values (that are usually drawn from a small domain) in a time/size bound cache.

Intent is to reduce the amount of tenured garbage produced when messages are repeatedly sent
to same destination. This should reduce frequency and length of GC pauses.

(cherry picked from commit 7d7b50824ad9f2a99b7de034ef36a529129b00ac)


Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/995d535b
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/995d535b
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/995d535b

Branch: refs/heads/7.0.x
Commit: 995d535bdea6fb5efcab9c165dbd9c807f94e72e
Parents: 83c6dfe
Author: Keith Wall <kwall@apache.org>
Authored: Thu Apr 26 13:11:45 2018 +0100
Committer: Alex Rudyy <orudyy@apache.org>
Committed: Fri May 11 16:08:11 2018 +0100

----------------------------------------------------------------------
 .../server/protocol/v0_8/AMQShortString.java    | 162 ++++++++-----------
 .../protocol/v0_8/AMQShortStringTest.java       |  26 +--
 .../server/protocol/v0_8/MessageMetaData.java   |   8 +
 .../transport/BasicContentHeaderProperties.java | 162 +++----------------
 .../v0_8/transport/BasicPublishBody.java        |   8 +
 5 files changed, 117 insertions(+), 249 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/995d535b/broker-core/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQShortString.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQShortString.java
b/broker-core/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQShortString.java
index 41a4322..0ee98c3 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQShortString.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQShortString.java
@@ -24,7 +24,10 @@ package org.apache.qpid.server.protocol.v0_8;
 import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
 import java.util.Arrays;
+import java.util.concurrent.TimeUnit;
 
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -45,15 +48,21 @@ public final class AMQShortString implements Comparable<AMQShortString>
 
     private static final Logger LOGGER = LoggerFactory.getLogger(AMQShortString.class);
 
+    // Unfortunately CacheBuilder does not yet support keyEquivalence, so we have to wrap
the keys in ByteBuffers
+    // rather than using the byte arrays as keys.
+    private static ThreadLocal<Cache<ByteBuffer, AMQShortString>> CACHE =
+            ThreadLocal.withInitial(() -> CacheBuilder.newBuilder()
+                                                      .maximumSize(100)
+                                                      .expireAfterAccess(300, TimeUnit.SECONDS)
+                                                      .build());
+
     private final byte[] _data;
-    private final int _offset;
     private int _hashCode;
     private String _asString = null;
 
-    private final int _length;
-
     public static final AMQShortString EMPTY_STRING = createAMQShortString((String)null);
 
+
     private AMQShortString(byte[] data)
     {
         if (data == null)
@@ -64,48 +73,6 @@ public final class AMQShortString implements Comparable<AMQShortString>
         {
             throw new IllegalArgumentException("Cannot create AMQShortString with number
of octets over 255!");
         }
-        _data = data.clone();
-        _length = data.length;
-        _offset = 0;
-    }
-
-    private AMQShortString(String string)
-    {
-        final byte[] data = EncodingUtils.asUTF8Bytes(string);
-        final int length = data.length;
-        if (data.length> MAX_LENGTH)
-        {
-            throw new IllegalArgumentException("Cannot create AMQShortString with number
of octets over 255!");
-        }
-
-        int hash = 0;
-        for (int i = 0; i < length; i++)
-        {
-            data[i] = (byte) (0xFF & data[i]);
-            hash = (31 * hash) + data[i];
-        }
-        _hashCode = hash;
-        _data = data;
-
-        _length = length;
-        _offset = 0;
-
-        _asString = string == null ? "" : string;
-    }
-
-    private AMQShortString(byte[] data, final int offset, final int length)
-    {
-        if (length > MAX_LENGTH)
-        {
-            throw new IllegalArgumentException("Cannot create AMQShortString with number
of octets over 255!");
-        }
-        if (data == null)
-        {
-            throw new NullPointerException("Cannot create AMQShortString with null data[]");
-        }
-
-        _offset = offset;
-        _length = length;
         _data = data;
     }
 
@@ -132,18 +99,46 @@ public final class AMQShortString implements Comparable<AMQShortString>
             }
             byte[] data = new byte[length];
             buffer.get(data);
-            return new AMQShortString(data, 0, length);
+
+            final AMQShortString cached = CACHE.get().getIfPresent(ByteBuffer.wrap(data));
+            return cached != null ? cached : new AMQShortString(data);
         }
     }
 
     public static AMQShortString createAMQShortString(byte[] data)
     {
-        return new AMQShortString(data);
+        if (data == null)
+        {
+            throw new NullPointerException("Cannot create AMQShortString with null data[]");
+        }
+
+        final AMQShortString cached = CACHE.get().getIfPresent(ByteBuffer.wrap(data));
+        return cached != null ? cached : new AMQShortString(data);
     }
 
     public static AMQShortString createAMQShortString(String string)
     {
-        return new AMQShortString(string);
+        final byte[] data = EncodingUtils.asUTF8Bytes(string);
+
+        final AMQShortString cached = CACHE.get().getIfPresent(ByteBuffer.wrap(data));
+        if (cached != null)
+        {
+            return cached;
+        }
+        else
+        {
+            final AMQShortString shortString = new AMQShortString(data);
+
+            int hash = 0;
+            for (int i = 0; i < data.length; i++)
+            {
+                data[i] = (byte) (0xFF & data[i]);
+                hash = (31 * hash) + data[i];
+            }
+            shortString._hashCode = hash;
+            shortString._asString = string;
+            return  shortString;
+        }
     }
 
     /**
@@ -152,35 +147,25 @@ public final class AMQShortString implements Comparable<AMQShortString>
      */
     public int length()
     {
-        return _length;
+        return _data.length;
     }
 
     public char charAt(int index)
     {
-
-        return (char) _data[_offset + index];
+        return (char) _data[index];
 
     }
 
     public byte[] getBytes()
     {
-        if(_offset == 0 && _length == _data.length)
-        {
-            return _data.clone();
-        }
-        else
-        {
-            byte[] data = new byte[_length];
-            System.arraycopy(_data,_offset,data,0,_length);
-            return data;
-        }
+        return _data.clone();
     }
 
     public void writeToBuffer(QpidByteBuffer buffer)
     {
-        final int size = length();
-        buffer.put((byte)size);
-        buffer.put(_data, _offset, size);
+        final short size = (short) length();
+        buffer.putUnsignedByte(size);
+        buffer.put(_data, 0, size);
     }
 
 
@@ -223,40 +208,14 @@ public final class AMQShortString implements Comparable<AMQShortString>
             return false;
         }
 
-        final int length = _length;
+        final int length = _data.length;
 
-        if(length != otherString._length)
+        if(length != otherString._data.length)
         {
             return false;
         }
 
-
-        final byte[] data = _data;
-
-        final byte[] otherData = otherString._data;
-
-        final int offset = _offset;
-
-        final int otherOffset = otherString._offset;
-
-        if(offset == 0 && otherOffset == 0 && length == data.length &&
length == otherData.length)
-        {
-            return Arrays.equals(data, otherData);
-        }
-        else
-        {
-            int thisIdx = offset;
-            int otherIdx = otherOffset;
-            for(int i = length;  i-- != 0; )
-            {
-                if(!(data[thisIdx++] == otherData[otherIdx++]))
-                {
-                    return false;
-                }
-            }
-        }
-
-        return true;
+        return Arrays.equals(_data, otherString._data);
 
     }
 
@@ -270,7 +229,7 @@ public final class AMQShortString implements Comparable<AMQShortString>
 
             for (int i = 0; i < size; i++)
             {
-                hash = (31 * hash) + _data[i+_offset];
+                hash = (31 * hash) + _data[i];
             }
 
             _hashCode = hash;
@@ -284,7 +243,7 @@ public final class AMQShortString implements Comparable<AMQShortString>
     {
         if (_asString == null)
         {
-            _asString = new String(_data, _offset, _length, StandardCharsets.UTF_8);
+            _asString = new String(_data, StandardCharsets.UTF_8);
         }
         return _asString;
     }
@@ -310,8 +269,8 @@ public final class AMQShortString implements Comparable<AMQShortString>
 
             for (int i = 0; i < length(); i++)
             {
-                final byte d = _data[i+_offset];
-                final byte n = name._data[i+name._offset];
+                final byte d = _data[i];
+                final byte n = name._data[i];
                 if (d < n)
                 {
                     return -1;
@@ -329,8 +288,8 @@ public final class AMQShortString implements Comparable<AMQShortString>
 
     public boolean contains(final byte b)
     {
-        final int end = _length + _offset;
-        for(int i = _offset; i < end; i++)
+        final int end = _data.length;
+        for(int i = 0; i < end; i++)
         {
             if(_data[i] == b)
             {
@@ -340,6 +299,11 @@ public final class AMQShortString implements Comparable<AMQShortString>
         return false;
     }
 
+    public void intern()
+    {
+        CACHE.get().put(ByteBuffer.wrap(_data), this);
+    }
+
     public static AMQShortString validValueOf(Object obj)
     {
         return valueOf(obj,true,true);

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/995d535b/broker-core/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQShortStringTest.java
----------------------------------------------------------------------
diff --git a/broker-core/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQShortStringTest.java
b/broker-core/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQShortStringTest.java
index 659d3e8..8ff7c96 100644
--- a/broker-core/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQShortStringTest.java
+++ b/broker-core/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQShortStringTest.java
@@ -27,12 +27,7 @@ import java.nio.charset.StandardCharsets;
 public class AMQShortStringTest extends QpidTestCase
 {
 
-    public static final AMQShortString HELLO = AMQShortString.createAMQShortString("Hello");
-    public static final AMQShortString HELL = AMQShortString.createAMQShortString("Hell");
-    public static final AMQShortString GOODBYE = AMQShortString.createAMQShortString("Goodbye");
-    public static final AMQShortString GOOD = AMQShortString.createAMQShortString("Good");
-    public static final AMQShortString BYE = AMQShortString.createAMQShortString("BYE");
-
+    private static final AMQShortString GOODBYE = AMQShortString.createAMQShortString("Goodbye");
 
     public void testEquals()
     {
@@ -57,9 +52,6 @@ public class AMQShortStringTest extends QpidTestCase
     }
 
     /**
-     * Test method for
-     * {@link AMQShortString#AMQShortString(java.lang.String)}
-     * <p>
      * Tests short string construction from string with length less than 255.
      */
     public void testCreateAMQShortStringString()
@@ -94,9 +86,6 @@ public class AMQShortStringTest extends QpidTestCase
     }
 
     /**
-     * Test method for
-     * {@link AMQShortString#AMQShortString(java.lang.String)}
-     * <p>
      * Tests an attempt to create an AMQP short string from string with length over 255
      */
     public void testCreateAMQShortStringStringOver255()
@@ -141,6 +130,19 @@ public class AMQShortStringTest extends QpidTestCase
         assertEquals("Unexpected null string from valueOf", null, shortString);
     }
 
+    public void testInterning()
+    {
+        AMQShortString str1 = AMQShortString.createAMQShortString("hello");
+        str1.intern();
+        AMQShortString str2 = AMQShortString.createAMQShortString("hello");
+        AMQShortString str3 = AMQShortString.createAMQShortString("hello".getBytes(StandardCharsets.UTF_8));
+
+        assertEquals(str1, str2);
+        assertEquals(str1, str3);
+        assertSame(str1, str2);
+        assertSame(str1, str3);
+    }
+
     /**
      * A helper method to generate a string with given length containing given
      * character

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/995d535b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java
b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java
index b182c8d..2ad8511 100644
--- a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java
+++ b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java
@@ -164,7 +164,15 @@ public class MessageMetaData implements StorableMessageMetaData
 
                 ContentHeaderBody chb = ContentHeaderBody.createFromBuffer(buf, size);
                 final AMQShortString exchange = AMQShortString.readAMQShortString(buf);
+                if (exchange != null)
+                {
+                    exchange.intern();
+                }
                 final AMQShortString routingKey = AMQShortString.readAMQShortString(buf);
+                if (routingKey != null)
+                {
+                    routingKey.intern();
+                }
 
                 final byte flags = buf.get();
                 long arrivalTime = buf.getLong();

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/995d535b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/transport/BasicContentHeaderProperties.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/transport/BasicContentHeaderProperties.java
b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/transport/BasicContentHeaderProperties.java
index 6d3a55e..52183fc 100644
--- a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/transport/BasicContentHeaderProperties.java
+++ b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/transport/BasicContentHeaderProperties.java
@@ -322,144 +322,6 @@ public class BasicContentHeaderProperties
         }
     }
 
-    public int read(QpidByteBuffer input)
-    {
-
-        _propertyFlags = input.getUnsignedShort();
-        int length = 2;
-        if ((_propertyFlags & (CONTENT_TYPE_MASK)) != 0)
-        {
-            length++;
-            _contentType = AMQShortString.readAMQShortString(input);
-            if(_contentType != null)
-            {
-                length += _contentType.length();
-            }
-        }
-
-        if ((_propertyFlags & ENCODING_MASK) != 0)
-        {
-            length++;
-            _encoding = AMQShortString.readAMQShortString(input);
-            if(_encoding != null)
-            {
-                length += _encoding.length();
-            }
-        }
-
-        if ((_propertyFlags & HEADERS_MASK) != 0)
-        {
-            int fieldTableLength = input.getInt();
-
-            _headers = new FieldTable(input, fieldTableLength);
-
-            length += 4;
-            length += fieldTableLength;
-        }
-
-        if ((_propertyFlags & DELIVERY_MODE_MASK) != 0)
-        {
-            _deliveryMode = input.get();
-            length++;
-        }
-
-        if ((_propertyFlags & PRIORITY_MASK) != 0)
-        {
-            _priority = input.get();
-            length++;
-        }
-
-        if ((_propertyFlags & CORRELATION_ID_MASK) != 0)
-        {
-            length++;
-            _correlationId = AMQShortString.readAMQShortString(input);
-            if(_correlationId != null)
-            {
-                length += _correlationId.length();
-            }
-        }
-
-        if ((_propertyFlags & REPLY_TO_MASK) != 0)
-        {
-            length++;
-            _replyTo = AMQShortString.readAMQShortString(input);
-            if(_replyTo != null)
-            {
-                length += _replyTo.length();
-            }
-        }
-
-        if ((_propertyFlags & EXPIRATION_MASK) != 0)
-        {
-            length++;
-            AMQShortString expiration = AMQShortString.readAMQShortString(input);
-            if(expiration != null)
-            {
-                length += expiration.length();
-                _expiration = Long.parseLong(expiration.toString());
-            }
-        }
-
-        if ((_propertyFlags & MESSAGE_ID_MASK) != 0)
-        {
-            length++;
-            _messageId = AMQShortString.readAMQShortString(input);
-            if(_messageId != null)
-            {
-                length += _messageId.length();
-            }
-        }
-
-        if ((_propertyFlags & TIMESTAMP_MASK) != 0)
-        {
-            _timestamp = input.getLong();
-            length += 8;
-        }
-
-        if ((_propertyFlags & TYPE_MASK) != 0)
-        {
-            length++;
-            _type = AMQShortString.readAMQShortString(input);
-            if(_type != null)
-            {
-                length += _type.length();
-            }
-        }
-
-        if ((_propertyFlags & USER_ID_MASK) != 0)
-        {
-            length++;
-            _userId = AMQShortString.readAMQShortString(input);
-            if(_userId != null)
-            {
-                length += _userId.length();
-            }
-        }
-
-        if ((_propertyFlags & APPLICATION_ID_MASK) != 0)
-        {
-            length++;
-            _appId = AMQShortString.readAMQShortString(input);
-            if(_appId != null)
-            {
-                length += _appId.length();
-            }
-        }
-
-        if ((_propertyFlags & CLUSTER_ID_MASK) != 0)
-        {
-            length++;
-            _clusterId = AMQShortString.readAMQShortString(input);
-            if(_clusterId != null)
-            {
-                length += _clusterId.length();
-            }
-        }
-
-        return length;
-    }
-
-
     public synchronized long writePropertyListPayload(final ByteBufferSender sender)
     {
         if(useEncodedForm())
@@ -512,11 +374,19 @@ public class BasicContentHeaderProperties
         if ((_propertyFlags & (CONTENT_TYPE_MASK)) != 0)
         {
             _contentType = AMQShortString.readAMQShortString(buffer);
+            if (_contentType != null)
+            {
+                _contentType.intern();
+            }
         }
 
         if ((_propertyFlags & ENCODING_MASK) != 0)
         {
             _encoding = AMQShortString.readAMQShortString(buffer);
+            if (_encoding != null)
+            {
+                _encoding.intern();
+            }
         }
 
         if ((_propertyFlags & HEADERS_MASK) != 0)
@@ -548,6 +418,10 @@ public class BasicContentHeaderProperties
         if ((_propertyFlags & REPLY_TO_MASK) != 0)
         {
             _replyTo = AMQShortString.readAMQShortString(buffer);
+            if (_replyTo != null)
+            {
+                _replyTo.intern();
+            }
         }
 
         if ((_propertyFlags & EXPIRATION_MASK) != 0)
@@ -573,16 +447,28 @@ public class BasicContentHeaderProperties
         if ((_propertyFlags & USER_ID_MASK) != 0)
         {
             _userId = AMQShortString.readAMQShortString(buffer);
+            if (_userId != null)
+            {
+                _userId.intern();
+            }
         }
 
         if ((_propertyFlags & APPLICATION_ID_MASK) != 0)
         {
             _appId = AMQShortString.readAMQShortString(buffer);
+            if (_appId != null)
+            {
+                _appId.intern();
+            }
         }
 
         if ((_propertyFlags & CLUSTER_ID_MASK) != 0)
         {
             _clusterId = AMQShortString.readAMQShortString(buffer);
+            if (_clusterId != null)
+            {
+                _clusterId.intern();
+            }
         }
 
     }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/995d535b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/transport/BasicPublishBody.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/transport/BasicPublishBody.java
b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/transport/BasicPublishBody.java
index 9821383..a8243ea 100644
--- a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/transport/BasicPublishBody.java
+++ b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/transport/BasicPublishBody.java
@@ -152,7 +152,15 @@ public class BasicPublishBody extends AMQMethodBodyImpl implements EncodableAMQD
 
         int ticket = buffer.getUnsignedShort();
         AMQShortString exchange = AMQShortString.readAMQShortString(buffer);
+        if (exchange != null)
+        {
+            exchange.intern();
+        }
         AMQShortString routingKey = AMQShortString.readAMQShortString(buffer);
+        if (routingKey != null)
+        {
+            routingKey.intern();
+        }
         byte bitfield = buffer.get();
 
         boolean mandatory = (bitfield & 0x01) != 0;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message