activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From michaelpea...@apache.org
Subject [5/6] activemq-artemis git commit: ARTEMIS-1586 Reduce GC pressure due to String allocations on Core protocol
Date Wed, 17 Jan 2018 08:33:59 GMT
ARTEMIS-1586 Reduce GC pressure due to String allocations on Core protocol

The commit contains:
- a general purpose interner implementation
- StringValue/SimpleString internrs specializations
- TypedProperties keys/values string interning for SessionSendMessage decoding


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/8d776edd
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/8d776edd
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/8d776edd

Branch: refs/heads/master
Commit: 8d776eddfcc12bfc73771c04e376583c9fa221e1
Parents: 00bd989
Author: Francesco Nigro <nigro.fra@gmail.com>
Authored: Thu Jan 4 15:22:05 2018 +0100
Committer: Michael Pearce <michael.andre.pearce@me.com>
Committed: Wed Jan 17 09:33:41 2018 +0100

----------------------------------------------------------------------
 .../activemq/artemis/api/core/SimpleString.java | 138 +++++++++++++++-
 .../artemis/utils/AbstractInterner.java         | 157 +++++++++++++++++++
 .../utils/collections/TypedProperties.java      |  60 ++++++-
 .../artemis/core/message/impl/CoreMessage.java  |  36 ++++-
 .../core/protocol/ClientPacketDecoder.java      |   4 +
 .../impl/ActiveMQClientProtocolManager.java     |   4 +-
 .../core/protocol/ServerPacketDecoder.java      |  29 +++-
 .../protocol/core/impl/CoreProtocolManager.java |   2 +-
 ...ctiveMQServerSideProtocolManagerFactory.java |   4 +-
 9 files changed, 404 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8d776edd/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/SimpleString.java
----------------------------------------------------------------------
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/SimpleString.java
b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/SimpleString.java
index 79909c7..e24e245 100644
--- a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/SimpleString.java
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/SimpleString.java
@@ -21,6 +21,8 @@ import java.util.ArrayList;
 import java.util.List;
 
 import io.netty.buffer.ByteBuf;
+import io.netty.util.internal.PlatformDependent;
+import org.apache.activemq.artemis.utils.AbstractInterner;
 import org.apache.activemq.artemis.utils.DataConstants;
 
 /**
@@ -31,6 +33,129 @@ import org.apache.activemq.artemis.utils.DataConstants;
  */
 public final class SimpleString implements CharSequence, Serializable, Comparable<SimpleString>
{
 
+   public static final class Interner extends AbstractInterner<SimpleString> {
+
+      private final int maxLength;
+
+      public Interner(final int capacity, final int maxCharsLength) {
+         super(capacity);
+         this.maxLength = maxCharsLength;
+      }
+
+      @Override
+      protected boolean isEqual(final SimpleString entry, final ByteBuf byteBuf, final int
offset, final int length) {
+         return SimpleString.isEqual(entry, byteBuf, offset, length);
+      }
+
+      @Override
+      protected boolean canIntern(final ByteBuf byteBuf, final int length) {
+         assert length % 2 == 0 : "length must be a multiple of 2";
+         final int expectedStringLength = length >> 1;
+         return expectedStringLength <= maxLength;
+      }
+
+      @Override
+      protected SimpleString create(final ByteBuf byteBuf, final int length) {
+         return readSimpleString(byteBuf, length);
+      }
+   }
+
+   /**
+    * Returns {@code true} if  the {@link SimpleString} encoded content into {@code bytes}
is equals to {@code s},
+    * {@code false} otherwise.
+    * <p>
+    * It assumes that the {@code bytes} content is read using {@link SimpleString#readSimpleString(ByteBuf,
int)} ie starting right after the
+    * length field.
+    */
+   public static boolean isEqual(final SimpleString s, final ByteBuf bytes, final int offset,
final int length) {
+      if (s == null) {
+         return false;
+      }
+      final byte[] chars = s.getData();
+      if (chars.length != length)
+         return false;
+      if (PlatformDependent.isUnaligned() && PlatformDependent.hasUnsafe()) {
+         if ((offset + length) > bytes.writerIndex()) {
+            throw new IndexOutOfBoundsException();
+         }
+         if (bytes.hasArray()) {
+            return batchOnHeapIsEqual(chars, bytes.array(), bytes.arrayOffset() + offset,
length);
+         } else if (bytes.hasMemoryAddress()) {
+            return batchOffHeapIsEqual(chars, bytes.memoryAddress(), offset, length);
+         }
+      }
+      return byteBufIsEqual(chars, bytes, offset, length);
+   }
+
+   private static boolean byteBufIsEqual(final byte[] chars, final ByteBuf bytes, final int
offset, final int length) {
+      for (int i = 0; i < length; i++)
+         if (chars[i] != bytes.getByte(offset + i))
+            return false;
+      return true;
+   }
+
+   private static boolean batchOnHeapIsEqual(final byte[] chars,
+                                             final byte[] array,
+                                             final int arrayOffset,
+                                             final int length) {
+      final int longCount = length >>> 3;
+      final int bytesCount = length & 7;
+      int bytesIndex = arrayOffset;
+      int charsIndex = 0;
+      for (int i = 0; i < longCount; i++) {
+         final long charsLong = PlatformDependent.getLong(chars, charsIndex);
+         final long bytesLong = PlatformDependent.getLong(array, bytesIndex);
+         if (charsLong != bytesLong) {
+            return false;
+
+         }
+         bytesIndex += 8;
+         charsIndex += 8;
+      }
+      for (int i = 0; i < bytesCount; i++) {
+         final byte charsByte = PlatformDependent.getByte(chars, charsIndex);
+         final byte bytesByte = PlatformDependent.getByte(array, bytesIndex);
+         if (charsByte != bytesByte) {
+            return false;
+
+         }
+         bytesIndex++;
+         charsIndex++;
+      }
+      return true;
+   }
+
+   private static boolean batchOffHeapIsEqual(final byte[] chars,
+                                              final long address,
+                                              final int offset,
+                                              final int length) {
+      final int longCount = length >>> 3;
+      final int bytesCount = length & 7;
+      long bytesAddress = address + offset;
+      int charsIndex = 0;
+      for (int i = 0; i < longCount; i++) {
+         final long charsLong = PlatformDependent.getLong(chars, charsIndex);
+         final long bytesLong = PlatformDependent.getLong(bytesAddress);
+         if (charsLong != bytesLong) {
+            return false;
+
+         }
+         bytesAddress += 8;
+         charsIndex += 8;
+      }
+      for (int i = 0; i < bytesCount; i++) {
+         final byte charsByte = PlatformDependent.getByte(chars, charsIndex);
+         final byte bytesByte = PlatformDependent.getByte(bytesAddress);
+         if (charsByte != bytesByte) {
+            return false;
+
+         }
+         bytesAddress++;
+         charsIndex++;
+      }
+      return true;
+   }
+
    private static final long serialVersionUID = 4204223851422244307L;
 
    // Attributes
@@ -134,7 +259,6 @@ public final class SimpleString implements CharSequence, Serializable,
Comparabl
       return subSeq(start, end);
    }
 
-
    public static SimpleString readNullableSimpleString(ByteBuf buffer) {
       int b = buffer.readByte();
       if (b == DataConstants.NULL) {
@@ -143,13 +267,13 @@ public final class SimpleString implements CharSequence, Serializable,
Comparabl
       return readSimpleString(buffer);
    }
 
-
    public static SimpleString readSimpleString(ByteBuf buffer) {
       int len = buffer.readInt();
-      if (len > buffer.readableBytes()) {
-         throw new IndexOutOfBoundsException();
-      }
-      byte[] data = new byte[len];
+      return readSimpleString(buffer, len);
+   }
+
+   public static SimpleString readSimpleString(final ByteBuf buffer, final int length) {
+      byte[] data = new byte[length];
       buffer.readBytes(data);
       return new SimpleString(data);
    }
@@ -169,8 +293,6 @@ public final class SimpleString implements CharSequence, Serializable,
Comparabl
       buffer.writeBytes(data);
    }
 
-
-
    public SimpleString subSeq(final int start, final int end) {
       int len = data.length >> 1;
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8d776edd/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/AbstractInterner.java
----------------------------------------------------------------------
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/AbstractInterner.java
b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/AbstractInterner.java
new file mode 100644
index 0000000..7e1fe40
--- /dev/null
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/AbstractInterner.java
@@ -0,0 +1,157 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.utils;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.util.internal.MathUtil;
+import io.netty.util.internal.PlatformDependent;
+
+/**
+ * Thread-safe {@code <T>} interner.
+ * <p>
+ * Differently from {@link String#intern()} it contains a fixed amount of entries and
+ * when used by concurrent threads it doesn't ensure the uniqueness of the entries ie
+ * the same entry could be allocated multiple times by concurrent calls.
+ */
+public abstract class AbstractInterner<T> {
+
+   private final T[] entries;
+   private final int mask;
+   private final int shift;
+
+   public AbstractInterner(final int capacity) {
+      entries = (T[]) new Object[MathUtil.findNextPositivePowerOfTwo(capacity)];
+      mask = entries.length - 1;
+      //log2 of entries.length
+      shift = 31 - Integer.numberOfLeadingZeros(entries.length);
+   }
+
+   /**
+    * Batch hash code implementation that works at its best if {@code bytes}
+    * contains a {@link org.apache.activemq.artemis.api.core.SimpleString} encoded.
+    */
+   private static int hashCode(final ByteBuf bytes, final int offset, final int length) {
+      if (PlatformDependent.isUnaligned() && PlatformDependent.hasUnsafe()) {
+         //if the platform allows it, the hash code could be computed without bounds checking
+         if (bytes.hasArray()) {
+            return onHeapHashCode(bytes.array(), bytes.arrayOffset() + offset, length);
+         } else if (bytes.hasMemoryAddress()) {
+            return offHeapHashCode(bytes.memoryAddress(), offset, length);
+         }
+      }
+      return byteBufHashCode(bytes, offset, length);
+   }
+
+   private static int onHeapHashCode(final byte[] bytes, final int offset, final int length)
{
+      final int intCount = length >>> 1;
+      final int byteCount = length & 1;
+      int hashCode = 1;
+      int arrayIndex = offset;
+      for (int i = 0; i < intCount; i++) {
+         hashCode = 31 * hashCode + PlatformDependent.getShort(bytes, arrayIndex);
+         arrayIndex += 2;
+      }
+      for (int i = 0; i < byteCount; i++) {
+         hashCode = 31 * hashCode + PlatformDependent.getByte(bytes, arrayIndex++);
+      }
+      return hashCode;
+   }
+
+   private static int offHeapHashCode(final long address, final int offset, final int length)
{
+      final int intCount = length >>> 1;
+      final int byteCount = length & 1;
+      int hashCode = 1;
+      int arrayIndex = offset;
+      for (int i = 0; i < intCount; i++) {
+         hashCode = 31 * hashCode + PlatformDependent.getShort(address + arrayIndex);
+         arrayIndex += 2;
+      }
+      for (int i = 0; i < byteCount; i++) {
+         hashCode = 31 * hashCode + PlatformDependent.getByte(address + arrayIndex++);
+      }
+      return hashCode;
+   }
+
+   private static int byteBufHashCode(final ByteBuf byteBuf, final int offset, final int
length) {
+      final int intCount = length >>> 1;
+      final int byteCount = length & 1;
+      int hashCode = 1;
+      int arrayIndex = offset;
+      for (int i = 0; i < intCount; i++) {
+         final short shortLE = byteBuf.getShortLE(arrayIndex);
+         final short nativeShort = PlatformDependent.BIG_ENDIAN_NATIVE_ORDER ? Short.reverseBytes(shortLE)
: shortLE;
+         hashCode = 31 * hashCode + nativeShort;
+         arrayIndex += 2;
+      }
+      for (int i = 0; i < byteCount; i++) {
+         hashCode = 31 * hashCode + byteBuf.getByte(arrayIndex++);
+      }
+      return hashCode;
+   }
+
+   /**
+    * Returns {@code true} if {@code length}'s {@code byteBuf} content from {@link ByteBuf#readerIndex()}
can be interned,
+    * {@code false} otherwise.
+    */
+   protected abstract boolean canIntern(ByteBuf byteBuf, int length);
+
+   /**
+    * Create a new entry.
+    */
+   protected abstract T create(ByteBuf byteBuf, int length);
+
+   /**
+    * Returns {@code true} if the {@code entry} content is the same of {@code byteBuf} at
the specified {@code offset}
+    * and {@code length} {@code false} otherwise.
+    */
+   protected abstract boolean isEqual(T entry, ByteBuf byteBuf, int offset, int length);
+
+   /**
+    * Returns and interned entry if possible, a new one otherwise.
+    * <p>
+    * The {@code byteBuf}'s {@link ByteBuf#readerIndex()} is incremented by {@code length}
after it.
+    */
+   public final T intern(final ByteBuf byteBuf, final int length) {
+      if (!canIntern(byteBuf, length)) {
+         return create(byteBuf, length);
+      } else {
+         if (!byteBuf.isReadable(length)) {
+            throw new IndexOutOfBoundsException();
+         }
+         final int bytesOffset = byteBuf.readerIndex();
+         final int hashCode = hashCode(byteBuf, bytesOffset, length);
+         //fast % operation with power of 2 entries.length
+         final int firstIndex = hashCode & mask;
+         final T firstEntry = entries[firstIndex];
+         if (isEqual(firstEntry, byteBuf, bytesOffset, length)) {
+            byteBuf.skipBytes(length);
+            return firstEntry;
+         }
+         final int secondIndex = (hashCode >> shift) & mask;
+         final T secondEntry = entries[secondIndex];
+         if (isEqual(secondEntry, byteBuf, bytesOffset, length)) {
+            byteBuf.skipBytes(length);
+            return secondEntry;
+         }
+         final T internedEntry = create(byteBuf, length);
+         final int entryIndex = firstEntry == null ? firstIndex : secondIndex;
+         entries[entryIndex] = internedEntry;
+         return internedEntry;
+      }
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8d776edd/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/TypedProperties.java
----------------------------------------------------------------------
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/TypedProperties.java
b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/TypedProperties.java
index b17156e..a3e4876 100644
--- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/TypedProperties.java
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/TypedProperties.java
@@ -28,6 +28,7 @@ import io.netty.buffer.ByteBuf;
 import org.apache.activemq.artemis.api.core.ActiveMQPropertyConversionException;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.logs.ActiveMQUtilBundle;
+import org.apache.activemq.artemis.utils.AbstractInterner;
 import org.apache.activemq.artemis.utils.ByteUtil;
 import org.apache.activemq.artemis.utils.DataConstants;
 
@@ -94,6 +95,7 @@ public class TypedProperties {
 
    public void putByteProperty(final SimpleString key, final byte value) {
       checkCreateProperties();
+      checkCreateProperties();
       doPutValue(key, ByteValue.valueOf(value));
    }
 
@@ -329,7 +331,9 @@ public class TypedProperties {
       }
    }
 
-   public synchronized void decode(final ByteBuf buffer) {
+   public synchronized void decode(final ByteBuf buffer,
+                                   final SimpleString.Interner keyInterner,
+                                   final StringValue.Interner valueInterner) {
       byte b = buffer.readByte();
 
       if (b == DataConstants.NULL) {
@@ -342,10 +346,15 @@ public class TypedProperties {
          size = 0;
 
          for (int i = 0; i < numHeaders; i++) {
+            final SimpleString key;
             int len = buffer.readInt();
-            byte[] data = new byte[len];
-            buffer.readBytes(data);
-            SimpleString key = new SimpleString(data);
+            if (keyInterner != null) {
+               key = keyInterner.intern(buffer, len);
+            } else {
+               byte[] data = new byte[len];
+               buffer.readBytes(data);
+               key = new SimpleString(data);
+            }
 
             byte type = buffer.readByte();
 
@@ -403,7 +412,12 @@ public class TypedProperties {
                   break;
                }
                case STRING: {
-                  val = new StringValue(buffer);
+                  if (valueInterner != null) {
+                     final int length = buffer.readInt();
+                     val = valueInterner.intern(buffer, length);
+                  } else {
+                     val = new StringValue(buffer);
+                  }
                   doPutValue(key, val);
                   break;
                }
@@ -415,6 +429,10 @@ public class TypedProperties {
       }
    }
 
+   public synchronized void decode(final ByteBuf buffer) {
+      decode(buffer, null, null);
+   }
+
    public synchronized void encode(final ByteBuf buffer) {
       if (properties == null) {
          buffer.writeByte(DataConstants.NULL);
@@ -881,7 +899,37 @@ public class TypedProperties {
       }
    }
 
-   private static final class StringValue extends PropertyValue {
+   public static final class StringValue extends PropertyValue {
+
+      public static final class Interner extends AbstractInterner<StringValue> {
+
+         private final int maxLength;
+
+         public Interner(final int capacity, final int maxCharsLength) {
+            super(capacity);
+            this.maxLength = maxCharsLength;
+         }
+
+         @Override
+         protected boolean isEqual(final StringValue entry, final ByteBuf byteBuf, final
int offset, final int length) {
+            if (entry == null) {
+               return false;
+            }
+            return SimpleString.isEqual(entry.val, byteBuf, offset, length);
+         }
+
+         @Override
+         protected boolean canIntern(final ByteBuf byteBuf, final int length) {
+            assert length % 2 == 0 : "length must be a multiple of 2";
+            final int expectedStringLength = length >> 1;
+            return expectedStringLength <= maxLength;
+         }
+
+         @Override
+         protected StringValue create(final ByteBuf byteBuf, final int length) {
+            return new StringValue(SimpleString.readSimpleString(byteBuf, length));
+         }
+      }
 
       final SimpleString val;
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8d776edd/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
index b0656b6..4ebf97e 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
@@ -22,6 +22,7 @@ import java.nio.ByteBuffer;
 import java.util.LinkedList;
 import java.util.Set;
 
+import io.netty.buffer.ByteBuf;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
 import org.apache.activemq.artemis.api.core.ActiveMQException;
@@ -42,8 +43,6 @@ import org.apache.activemq.artemis.utils.UUID;
 import org.apache.activemq.artemis.utils.collections.TypedProperties;
 import org.jboss.logging.Logger;
 
-import io.netty.buffer.ByteBuf;
-
 /** Note: you shouldn't change properties using multi-threads. Change your properties before
you can send it to multiple
  *  consumers */
 public class CoreMessage extends RefCountMessage implements ICoreMessage {
@@ -94,7 +93,18 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage
{
 
    protected volatile TypedProperties properties;
 
+   private final SimpleString.Interner keysInterner;
+   private final TypedProperties.StringValue.Interner valuesInterner;
+
+   public CoreMessage(final SimpleString.Interner keysInterner,
+                      final TypedProperties.StringValue.Interner valuesInterner) {
+      this.keysInterner = keysInterner;
+      this.valuesInterner = valuesInterner;
+   }
+
    public CoreMessage() {
+      this.keysInterner = null;
+      this.valuesInterner = null;
    }
 
    /** On core there's no delivery annotation */
@@ -318,6 +328,8 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage
{
    public CoreMessage(long id, int bufferSize) {
       this.initBuffer(bufferSize);
       this.setMessageID(id);
+      this.keysInterner = null;
+      this.valuesInterner = null;
    }
 
    protected CoreMessage(CoreMessage other, TypedProperties copyProperties) {
@@ -331,6 +343,8 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage
{
       this.timestamp = other.timestamp;
       this.priority = other.priority;
       this.userID = other.userID;
+      this.keysInterner = other.keysInterner;
+      this.valuesInterner = other.valuesInterner;
       if (copyProperties != null) {
          this.properties = new TypedProperties(copyProperties);
       }
@@ -464,7 +478,8 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage
{
       if (properties == null) {
          TypedProperties properties = new TypedProperties();
          if (buffer != null && propertiesLocation >= 0) {
-            properties.decode(buffer.duplicate().readerIndex(propertiesLocation));
+            final ByteBuf byteBuf = buffer.duplicate().readerIndex(propertiesLocation);
+            properties.decode(byteBuf, keysInterner, valuesInterner);
          }
          this.properties = properties;
       }
@@ -528,8 +543,17 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage
{
    private void decodeHeadersAndProperties(final ByteBuf buffer, boolean lazyProperties)
{
       messageIDPosition = buffer.readerIndex();
       messageID = buffer.readLong();
-
-      address = SimpleString.readNullableSimpleString(buffer);
+      int b = buffer.readByte();
+      if (b != DataConstants.NULL) {
+         final int length = buffer.readInt();
+         if (keysInterner != null) {
+            address = keysInterner.intern(buffer, length);
+         } else {
+            address = SimpleString.readSimpleString(buffer, length);
+         }
+      } else {
+         address = null;
+      }
       if (buffer.readByte() == DataConstants.NOT_NULL) {
          byte[] bytes = new byte[16];
          buffer.readBytes(bytes);
@@ -547,7 +571,7 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage
{
          propertiesLocation = buffer.readerIndex();
       } else {
          properties = new TypedProperties();
-         properties.decode(buffer);
+         properties.decode(buffer, keysInterner, valuesInterner);
       }
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8d776edd/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/ClientPacketDecoder.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/ClientPacketDecoder.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/ClientPacketDecoder.java
index 1022030..787e499 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/ClientPacketDecoder.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/ClientPacketDecoder.java
@@ -34,6 +34,10 @@ public class ClientPacketDecoder extends PacketDecoder {
    private static final long serialVersionUID = 6952614096979334582L;
    public static final ClientPacketDecoder INSTANCE = new ClientPacketDecoder();
 
+   protected ClientPacketDecoder() {
+
+   }
+
    @Override
    public Packet decode(final ActiveMQBuffer in, CoreRemotingConnection connection) {
       final byte packetType = in.readByte();

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8d776edd/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQClientProtocolManager.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQClientProtocolManager.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQClientProtocolManager.java
index 93432b8..f0005ff 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQClientProtocolManager.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQClientProtocolManager.java
@@ -409,7 +409,7 @@ public class ActiveMQClientProtocolManager implements ClientProtocolManager
{
                                      List<Interceptor> incomingInterceptors,
                                      List<Interceptor> outgoingInterceptors,
                                      TopologyResponseHandler topologyResponseHandler) {
-      this.connection = new RemotingConnectionImpl(getPacketDecoder(), transportConnection,
callTimeout, callFailoverTimeout, incomingInterceptors, outgoingInterceptors);
+      this.connection = new RemotingConnectionImpl(createPacketDecoder(), transportConnection,
callTimeout, callFailoverTimeout, incomingInterceptors, outgoingInterceptors);
 
       this.topologyResponseHandler = topologyResponseHandler;
 
@@ -510,7 +510,7 @@ public class ActiveMQClientProtocolManager implements ClientProtocolManager
{
       }
    }
 
-   protected PacketDecoder getPacketDecoder() {
+   protected PacketDecoder createPacketDecoder() {
       return ClientPacketDecoder.INSTANCE;
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8d776edd/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ServerPacketDecoder.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ServerPacketDecoder.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ServerPacketDecoder.java
index 0584476..2276fdb 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ServerPacketDecoder.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ServerPacketDecoder.java
@@ -17,6 +17,7 @@
 package org.apache.activemq.artemis.core.protocol;
 
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.message.impl.CoreMessage;
 import org.apache.activemq.artemis.core.protocol.core.CoreRemotingConnection;
 import org.apache.activemq.artemis.core.protocol.core.Packet;
@@ -53,6 +54,7 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionReq
 import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendLargeMessage;
 import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendMessage;
 import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendMessage_1X;
+import org.apache.activemq.artemis.utils.collections.TypedProperties;
 
 import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.BACKUP_REQUEST;
 import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.BACKUP_REQUEST_RESPONSE;
@@ -83,16 +85,34 @@ import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SES
 
 public class ServerPacketDecoder extends ClientPacketDecoder {
 
+   private static final int UUID_LENGTH = 36;
+   private static final int DEFAULT_INTERNER_CAPACITY = 32;
    private static final long serialVersionUID = 3348673114388400766L;
-   public static final ServerPacketDecoder INSTANCE = new ServerPacketDecoder();
+   private SimpleString.Interner keysInterner;
+   private TypedProperties.StringValue.Interner valuesInterner;
 
-   private static SessionSendMessage decodeSessionSendMessage(final ActiveMQBuffer in, CoreRemotingConnection
connection) {
+   public ServerPacketDecoder() {
+      this.keysInterner = null;
+      this.valuesInterner = null;
+   }
+
+   private void initializeInternersIfNeeded() {
+      if (this.keysInterner == null) {
+         this.keysInterner = new SimpleString.Interner(DEFAULT_INTERNER_CAPACITY, UUID_LENGTH);
+      }
+      if (this.valuesInterner == null) {
+         this.valuesInterner = new TypedProperties.StringValue.Interner(DEFAULT_INTERNER_CAPACITY,
UUID_LENGTH);
+      }
+   }
+
+   private SessionSendMessage decodeSessionSendMessage(final ActiveMQBuffer in, CoreRemotingConnection
connection) {
       final SessionSendMessage sendMessage;
 
+      initializeInternersIfNeeded();
       if (connection.isVersionBeforeAddressChange()) {
-         sendMessage = new SessionSendMessage_1X(new CoreMessage());
+         sendMessage = new SessionSendMessage_1X(new CoreMessage(this.keysInterner, this.valuesInterner));
       } else {
-         sendMessage = new SessionSendMessage(new CoreMessage());
+         sendMessage = new SessionSendMessage(new CoreMessage(this.keysInterner, this.valuesInterner));
       }
 
       sendMessage.decode(in);
@@ -259,5 +279,4 @@ public class ServerPacketDecoder extends ClientPacketDecoder {
 
       return packet;
    }
-
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8d776edd/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreProtocolManager.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreProtocolManager.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreProtocolManager.java
index c9262fa..af9e131 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreProtocolManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreProtocolManager.java
@@ -116,7 +116,7 @@ public class CoreProtocolManager implements ProtocolManager<Interceptor>
{
 
       Executor connectionExecutor = server.getExecutorFactory().getExecutor();
 
-      final CoreRemotingConnection rc = new RemotingConnectionImpl(ServerPacketDecoder.INSTANCE,
connection, incomingInterceptors, outgoingInterceptors, config.isAsyncConnectionExecutionEnabled()
? connectionExecutor : null, server.getNodeID());
+      final CoreRemotingConnection rc = new RemotingConnectionImpl(new ServerPacketDecoder(),
connection, incomingInterceptors, outgoingInterceptors, config.isAsyncConnectionExecutionEnabled()
? connectionExecutor : null, server.getNodeID());
 
       Channel channel1 = rc.getChannel(CHANNEL_ID.SESSION.id, -1);
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8d776edd/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ActiveMQServerSideProtocolManagerFactory.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ActiveMQServerSideProtocolManagerFactory.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ActiveMQServerSideProtocolManagerFactory.java
index 85ad3a3..209f68f 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ActiveMQServerSideProtocolManagerFactory.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ActiveMQServerSideProtocolManagerFactory.java
@@ -65,8 +65,8 @@ public class ActiveMQServerSideProtocolManagerFactory implements ClientProtocolM
    class ActiveMQReplicationProtocolManager extends ActiveMQClientProtocolManager {
 
       @Override
-      protected PacketDecoder getPacketDecoder() {
-         return ServerPacketDecoder.INSTANCE;
+      protected PacketDecoder createPacketDecoder() {
+         return new ServerPacketDecoder();
       }
    }
 }


Mime
View raw message