activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From martyntay...@apache.org
Subject [08/22] activemq-artemis git commit: ARTEMIS-1009 Pure Message Encoding.
Date Mon, 06 Mar 2017 11:53:55 GMT
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
index 622f042..1569822 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
@@ -16,7 +16,6 @@
  */
 package org.apache.activemq.artemis.core.server.impl;
 
-import java.io.InputStream;
 import java.util.Collection;
 import java.util.HashSet;
 import java.util.LinkedList;
@@ -31,27 +30,26 @@ import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import io.netty.buffer.ByteBuf;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
-import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.ActiveMQPropertyConversionException;
 import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.api.core.RefCountMessage;
+import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.filter.Filter;
-import org.apache.activemq.artemis.core.message.BodyEncoder;
-import org.apache.activemq.artemis.core.paging.PagingStore;
+import org.apache.activemq.artemis.core.message.impl.CoreMessage;
 import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
+import org.apache.activemq.artemis.core.persistence.Persister;
 import org.apache.activemq.artemis.core.server.Consumer;
 import org.apache.activemq.artemis.core.server.MessageReference;
 import org.apache.activemq.artemis.core.server.Queue;
 import org.apache.activemq.artemis.core.server.RoutingContext;
-import org.apache.activemq.artemis.api.core.RoutingType;
-import org.apache.activemq.artemis.core.server.ServerMessage;
 import org.apache.activemq.artemis.core.transaction.Transaction;
 import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
 import org.apache.activemq.artemis.utils.LinkedListIterator;
 import org.apache.activemq.artemis.utils.RandomUtil;
 import org.apache.activemq.artemis.utils.ReferenceCounter;
-import org.apache.activemq.artemis.utils.TypedProperties;
 import org.apache.activemq.artemis.utils.UUID;
 import org.junit.Assert;
 import org.junit.Test;
@@ -283,214 +281,164 @@ public class ScheduledDeliveryHandlerTest extends Assert {
       }
    }
 
-   class FakeMessage implements ServerMessage {
-
-      final long id;
-
-      FakeMessage(final long id) {
-         this.id = id;
-      }
-
-      @Override
-      public FakeMessage setMessageID(long id) {
-         return this;
-      }
+   class FakeMessage extends RefCountMessage {
 
       @Override
-      public long getMessageID() {
-         return id;
-      }
-
-      @Override
-      public MessageReference createReference(Queue queue) {
+      public RoutingType getRouteType() {
          return null;
       }
 
       @Override
-      public void forceAddress(SimpleString address) {
-
-      }
-
-      @Override
-      public int incrementRefCount() throws Exception {
-         return 0;
-      }
-
-      @Override
-      public int decrementRefCount() throws Exception {
-         return 0;
+      public SimpleString getReplyTo() {
+         return null;
       }
 
       @Override
-      public int incrementDurableRefCount() {
-         return 0;
+      public Message setReplyTo(SimpleString address) {
+         return null;
       }
 
       @Override
-      public int decrementDurableRefCount() {
-         return 0;
+      public boolean containsDeliveryAnnotationProperty(SimpleString property) {
+         return false;
       }
 
       @Override
-      public ServerMessage copy(long newID) {
+      public Object removeDeliveryAnnoationProperty(SimpleString key) {
          return null;
       }
 
       @Override
-      public ServerMessage copy() {
+      public Object getDeliveryAnnotationProperty(SimpleString key) {
          return null;
       }
 
       @Override
-      public int getMemoryEstimate() {
-         return 0;
-      }
+      public void persist(ActiveMQBuffer targetRecord) {
 
-      @Override
-      public int getRefCount() {
-         return 0;
       }
 
       @Override
-      public ServerMessage makeCopyForExpiryOrDLA(long newID,
-                                                  MessageReference originalReference,
-                                                  boolean expiry,
-                                                  boolean copyOriginalHeaders) throws Exception {
+      public Long getScheduledDeliveryTime() {
          return null;
       }
 
       @Override
-      public void setOriginalHeaders(ServerMessage other, MessageReference originalReference, boolean expiry) {
-
-      }
-
-      @Override
-      public void setPagingStore(PagingStore store) {
+      public void reloadPersistence(ActiveMQBuffer record) {
 
       }
 
       @Override
-      public PagingStore getPagingStore() {
+      public Persister<Message> getPersister() {
          return null;
       }
 
       @Override
-      public boolean hasInternalProperties() {
-         return false;
-      }
-
-      @Override
-      public boolean storeIsPaging() {
-         return false;
+      public int getPersistSize() {
+         return 0;
       }
+      final long id;
 
       @Override
-      public void encodeMessageIDToBuffer() {
-
+      public CoreMessage toCore() {
+         return null;
       }
 
-      @Override
-      public byte[] getDuplicateIDBytes() {
-         return new byte[0];
+      FakeMessage(final long id) {
+         this.id = id;
       }
 
       @Override
-      public Object getDuplicateProperty() {
-         return null;
+      public FakeMessage setMessageID(long id) {
+         return this;
       }
 
       @Override
-      public void encode(ActiveMQBuffer buffer) {
-
+      public long getMessageID() {
+         return id;
       }
 
       @Override
-      public void decode(ActiveMQBuffer buffer) {
-
+      public int incrementRefCount() throws Exception {
+         return 0;
       }
 
       @Override
-      public void decodeFromBuffer(ActiveMQBuffer buffer) {
-
+      public int decrementRefCount() throws Exception {
+         return 0;
       }
 
       @Override
-      public int getEndOfMessagePosition() {
+      public int incrementDurableRefCount() {
          return 0;
       }
 
       @Override
-      public int getEndOfBodyPosition() {
+      public int decrementDurableRefCount() {
          return 0;
       }
 
       @Override
-      public void bodyChanged() {
-
+      public Message copy(long newID) {
+         return null;
       }
 
       @Override
-      public boolean isServerMessage() {
-         return false;
+      public Message copy() {
+         return null;
       }
 
       @Override
-      public ActiveMQBuffer getEncodedBuffer() {
-         return null;
+      public int getMemoryEstimate() {
+         return 0;
       }
 
       @Override
-      public int getHeadersAndPropertiesEncodeSize() {
+      public int getRefCount() {
          return 0;
       }
 
       @Override
-      public ActiveMQBuffer getWholeBuffer() {
-         return null;
+      public byte[] getDuplicateIDBytes() {
+         return new byte[0];
       }
 
       @Override
-      public void encodeHeadersAndProperties(ActiveMQBuffer buffer) {
-
+      public Object getDuplicateProperty() {
+         return null;
       }
 
       @Override
-      public void decodeHeadersAndProperties(ActiveMQBuffer buffer) {
+      public void messageChanged() {
 
       }
 
       @Override
-      public BodyEncoder getBodyEncoder() throws ActiveMQException {
+      public UUID getUserID() {
          return null;
       }
 
       @Override
-      public InputStream getBodyInputStream() {
+      public String getAddress() {
          return null;
       }
 
       @Override
-      public void setAddressTransient(SimpleString address) {
-
-      }
-
-      @Override
-      public TypedProperties getTypedProperties() {
+      public SimpleString getAddressSimpleString() {
          return null;
       }
 
       @Override
-      public UUID getUserID() {
+      public Message setBuffer(ByteBuf buffer) {
          return null;
       }
 
       @Override
-      public FakeMessage setUserID(UUID userID) {
-         return this;
+      public ByteBuf getBuffer() {
+         return null;
       }
-
       @Override
-      public SimpleString getAddress() {
+      public Message setAddress(String address) {
          return null;
       }
 
@@ -500,11 +448,6 @@ public class ScheduledDeliveryHandlerTest extends Assert {
       }
 
       @Override
-      public byte getType() {
-         return 0;
-      }
-
-      @Override
       public boolean isDurable() {
          return false;
       }
@@ -560,16 +503,6 @@ public class ScheduledDeliveryHandlerTest extends Assert {
       }
 
       @Override
-      public ActiveMQBuffer getBodyBuffer() {
-         return null;
-      }
-
-      @Override
-      public ActiveMQBuffer getBodyBufferDuplicate() {
-         return null;
-      }
-
-      @Override
       public Message putBooleanProperty(SimpleString key, boolean value) {
          return null;
       }
@@ -825,13 +758,18 @@ public class ScheduledDeliveryHandlerTest extends Assert {
       }
 
       @Override
-      public FakeMessage writeBodyBufferBytes(byte[] bytes) {
-         return this;
+      public Message setUserID(Object userID) {
+         return null;
       }
 
       @Override
-      public FakeMessage writeBodyBufferString(String string) {
-         return this;
+      public void receiveBuffer(ByteBuf buffer) {
+
+      }
+
+      @Override
+      public void sendBuffer(ByteBuf buffer, int count) {
+
       }
    }
 
@@ -1221,7 +1159,7 @@ public class ScheduledDeliveryHandlerTest extends Assert {
       }
 
       @Override
-      public boolean hasMatchingConsumer(ServerMessage message) {
+      public boolean hasMatchingConsumer(Message message) {
          return false;
       }
 
@@ -1338,12 +1276,12 @@ public class ScheduledDeliveryHandlerTest extends Assert {
       }
 
       @Override
-      public void route(ServerMessage message, RoutingContext context) throws Exception {
+      public void route(Message message, RoutingContext context) throws Exception {
 
       }
 
       @Override
-      public void routeWithAck(ServerMessage message, RoutingContext context) {
+      public void routeWithAck(Message message, RoutingContext context) {
 
       }
 
@@ -1366,5 +1304,9 @@ public class ScheduledDeliveryHandlerTest extends Assert {
       public void decDelivering(int size) {
 
       }
+
+
+
+
    }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java
index ee80054..b1ea206 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java
@@ -26,13 +26,13 @@ import java.util.concurrent.Executor;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.Pair;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.io.IOCallback;
 import org.apache.activemq.artemis.core.io.SequentialFile;
 import org.apache.activemq.artemis.core.journal.Journal;
 import org.apache.activemq.artemis.core.journal.JournalLoadInformation;
-import org.apache.activemq.artemis.core.message.impl.MessageInternal;
 import org.apache.activemq.artemis.core.paging.PageTransactionInfo;
 import org.apache.activemq.artemis.core.paging.PagedMessage;
 import org.apache.activemq.artemis.core.paging.PagingManager;
@@ -53,7 +53,6 @@ import org.apache.activemq.artemis.core.replication.ReplicationManager;
 import org.apache.activemq.artemis.core.server.LargeServerMessage;
 import org.apache.activemq.artemis.core.server.MessageReference;
 import org.apache.activemq.artemis.core.server.RouteContextList;
-import org.apache.activemq.artemis.core.server.ServerMessage;
 import org.apache.activemq.artemis.core.server.files.FileStoreMonitor;
 import org.apache.activemq.artemis.core.server.group.impl.GroupBinding;
 import org.apache.activemq.artemis.core.server.impl.AddressInfo;
@@ -323,7 +322,7 @@ public class TransactionImplTest extends ActiveMQTestBase {
       }
 
       @Override
-      public void storeMessage(ServerMessage message) throws Exception {
+      public void storeMessage(Message message) throws Exception {
 
       }
 
@@ -368,7 +367,7 @@ public class TransactionImplTest extends ActiveMQTestBase {
       }
 
       @Override
-      public void storeMessageTransactional(long txID, ServerMessage message) throws Exception {
+      public void storeMessageTransactional(long txID, Message message) throws Exception {
 
       }
 
@@ -439,7 +438,7 @@ public class TransactionImplTest extends ActiveMQTestBase {
       }
 
       @Override
-      public LargeServerMessage createLargeMessage(long id, MessageInternal message) throws Exception {
+      public LargeServerMessage createLargeMessage(long id, Message message) throws Exception {
          return null;
       }
 
@@ -489,11 +488,6 @@ public class TransactionImplTest extends ActiveMQTestBase {
       }
 
       @Override
-      public void updatePageTransaction(PageTransactionInfo pageTransaction, int depage) throws Exception {
-
-      }
-
-      @Override
       public void deletePageTransactional(long recordID) throws Exception {
 
       }
@@ -643,7 +637,7 @@ public class TransactionImplTest extends ActiveMQTestBase {
 
       @Override
       public boolean addToPage(PagingStore store,
-                               ServerMessage msg,
+                               Message msg,
                                Transaction tx,
                                RouteContextList listCtx) throws Exception {
          return false;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java
index 2f12b05..0bb177d 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java
@@ -65,6 +65,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
+import org.apache.activemq.artemis.api.core.ICoreMessage;
 import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.Pair;
 import org.apache.activemq.artemis.api.core.SimpleString;
@@ -94,6 +95,7 @@ import org.apache.activemq.artemis.core.journal.RecordInfo;
 import org.apache.activemq.artemis.core.journal.impl.JournalFile;
 import org.apache.activemq.artemis.core.journal.impl.JournalImpl;
 import org.apache.activemq.artemis.core.journal.impl.JournalReaderCallback;
+import org.apache.activemq.artemis.core.message.impl.CoreMessage;
 import org.apache.activemq.artemis.core.paging.PagingStore;
 import org.apache.activemq.artemis.core.persistence.impl.journal.OperationContextImpl;
 import org.apache.activemq.artemis.core.postoffice.Binding;
@@ -117,14 +119,12 @@ import org.apache.activemq.artemis.core.server.JournalType;
 import org.apache.activemq.artemis.core.server.MessageReference;
 import org.apache.activemq.artemis.core.server.NodeManager;
 import org.apache.activemq.artemis.core.server.Queue;
-import org.apache.activemq.artemis.core.server.ServerMessage;
 import org.apache.activemq.artemis.core.server.cluster.ClusterConnection;
 import org.apache.activemq.artemis.core.server.cluster.ClusterManager;
 import org.apache.activemq.artemis.core.server.cluster.RemoteQueueBinding;
 import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
 import org.apache.activemq.artemis.core.server.impl.Activation;
 import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
-import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl;
 import org.apache.activemq.artemis.core.server.impl.SharedNothingBackupActivation;
 import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
 import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
@@ -855,7 +855,7 @@ public abstract class ActiveMQTestBase extends Assert {
       return testDir1 + "/journal";
    }
 
-   protected String getJournalDir(final int index, final boolean backup) {
+   public String getJournalDir(final int index, final boolean backup) {
       return getJournalDir(getTestDir(), index, backup);
    }
 
@@ -2079,8 +2079,8 @@ public abstract class ActiveMQTestBase extends Assert {
       }
    }
 
-   protected ServerMessage generateMessage(final long id) {
-      ServerMessage message = new ServerMessageImpl(id, 1000);
+   protected Message generateMessage(final long id) {
+      ICoreMessage message = new CoreMessage(id, 1000);
 
       message.setMessageID(id);
 
@@ -2092,9 +2092,9 @@ public abstract class ActiveMQTestBase extends Assert {
    }
 
    protected MessageReference generateReference(final Queue queue, final long id) {
-      ServerMessage message = generateMessage(id);
+      Message message = generateMessage(id);
 
-      return message.createReference(queue);
+      return MessageReference.Factory.createReference(message, queue);
    }
 
    protected int calculateRecordSize(final int size, final int alignment) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-server/src/test/resources/ConfigurationTest-full-config.xml
----------------------------------------------------------------------
diff --git a/artemis-server/src/test/resources/ConfigurationTest-full-config.xml b/artemis-server/src/test/resources/ConfigurationTest-full-config.xml
index 9ed5584..0691e95 100644
--- a/artemis-server/src/test/resources/ConfigurationTest-full-config.xml
+++ b/artemis-server/src/test/resources/ConfigurationTest-full-config.xml
@@ -231,8 +231,6 @@
       <journal-compact-percentage>33</journal-compact-percentage>
       <journal-compact-min-files>123</journal-compact-min-files>
       <journal-max-io>56546</journal-max-io>
-      <perf-blast-pages>5</perf-blast-pages>
-      <run-sync-speed-test>true</run-sync-speed-test>
       <server-dump-interval>5000</server-dump-interval>
       <memory-warning-threshold>95</memory-warning-threshold>
       <memory-measure-interval>54321</memory-measure-interval>

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-tools/src/test/resources/artemis-configuration.xsd
----------------------------------------------------------------------
diff --git a/artemis-tools/src/test/resources/artemis-configuration.xsd b/artemis-tools/src/test/resources/artemis-configuration.xsd
index fcb7a20..090f968 100644
--- a/artemis-tools/src/test/resources/artemis-configuration.xsd
+++ b/artemis-tools/src/test/resources/artemis-configuration.xsd
@@ -673,22 +673,6 @@
             </xsd:annotation>
          </xsd:element>
 
-         <xsd:element name="perf-blast-pages" type="xsd:int" default="-1" maxOccurs="1" minOccurs="0">
-            <xsd:annotation>
-               <xsd:documentation>
-                  XXX Only meant to be used by project developers
-               </xsd:documentation>
-            </xsd:annotation>
-         </xsd:element>
-
-         <xsd:element name="run-sync-speed-test" type="xsd:boolean" default="false" maxOccurs="1" minOccurs="0">
-            <xsd:annotation>
-               <xsd:documentation>
-                  XXX Only meant to be used by project developers
-               </xsd:documentation>
-            </xsd:annotation>
-         </xsd:element>
-
          <xsd:element name="server-dump-interval" type="xsd:long" default="-1" maxOccurs="1" minOccurs="0">
             <xsd:annotation>
                <xsd:documentation>

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/examples/features/standard/bridge/src/main/java/org/apache/activemq/artemis/jms/example/HatColourChangeTransformer.java
----------------------------------------------------------------------
diff --git a/examples/features/standard/bridge/src/main/java/org/apache/activemq/artemis/jms/example/HatColourChangeTransformer.java b/examples/features/standard/bridge/src/main/java/org/apache/activemq/artemis/jms/example/HatColourChangeTransformer.java
index d7a7bdc..df9d79e 100644
--- a/examples/features/standard/bridge/src/main/java/org/apache/activemq/artemis/jms/example/HatColourChangeTransformer.java
+++ b/examples/features/standard/bridge/src/main/java/org/apache/activemq/artemis/jms/example/HatColourChangeTransformer.java
@@ -16,14 +16,14 @@
  */
 package org.apache.activemq.artemis.jms.example;
 
+import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.SimpleString;
-import org.apache.activemq.artemis.core.server.ServerMessage;
 import org.apache.activemq.artemis.core.server.cluster.Transformer;
 
 public class HatColourChangeTransformer implements Transformer {
 
    @Override
-   public ServerMessage transform(final ServerMessage message) {
+   public Message transform(final Message message) {
       SimpleString propName = new SimpleString("hat");
 
       SimpleString oldProp = message.getSimpleStringProperty(propName);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/examples/features/standard/divert/src/main/java/org/apache/activemq/artemis/jms/example/AddForwardingTimeTransformer.java
----------------------------------------------------------------------
diff --git a/examples/features/standard/divert/src/main/java/org/apache/activemq/artemis/jms/example/AddForwardingTimeTransformer.java b/examples/features/standard/divert/src/main/java/org/apache/activemq/artemis/jms/example/AddForwardingTimeTransformer.java
index 22272d0..2f75d4c 100644
--- a/examples/features/standard/divert/src/main/java/org/apache/activemq/artemis/jms/example/AddForwardingTimeTransformer.java
+++ b/examples/features/standard/divert/src/main/java/org/apache/activemq/artemis/jms/example/AddForwardingTimeTransformer.java
@@ -16,16 +16,13 @@
  */
 package org.apache.activemq.artemis.jms.example;
 
-import org.apache.activemq.artemis.api.core.SimpleString;
-import org.apache.activemq.artemis.core.server.ServerMessage;
+import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.core.server.cluster.Transformer;
 
 public class AddForwardingTimeTransformer implements Transformer {
 
    @Override
-   public ServerMessage transform(final ServerMessage message) {
-      message.putLongProperty(new SimpleString("time_of_forward"), System.currentTimeMillis());
-
+   public Message transform(final Message message) {
       return message;
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 5b056ad..beb6275 100644
--- a/pom.xml
+++ b/pom.xml
@@ -82,11 +82,11 @@
       <jetty.version>9.4.0.M1</jetty.version>
       <jgroups.version>3.6.9.Final</jgroups.version>
       <maven.assembly.plugin.version>2.4</maven.assembly.plugin.version>
-      <netty.version>4.1.5.Final</netty.version>
+      <netty.version>4.1.6.Final</netty.version>
       <proton.version>0.16.0</proton.version>
       <resteasy.version>3.0.19.Final</resteasy.version>
       <slf4j.version>1.7.21</slf4j.version>
-      <qpid.jms.version>0.11.0</qpid.jms.version>
+      <qpid.jms.version>0.20.0</qpid.jms.version>
       <johnzon.version>0.9.5</johnzon.version>
       <json-p.spec.version>1.0-alpha-1</json-p.spec.version>
       <javax.inject.version>1</javax.inject.version>
@@ -1006,6 +1006,7 @@
                     <arg>-Xep:StaticAccessedFromInstance:ERROR</arg>
                     <arg>-Xep:SynchronizeOnNonFinalField:ERROR</arg>
                     <arg>-Xep:WaitNotInLoop:ERROR</arg>
+                    <arg>-Xdiags:verbose</arg>
                  </compilerArgs>
                </configuration>
                <dependencies>

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/PartialPooledByteBufAllocator.java
----------------------------------------------------------------------
diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/PartialPooledByteBufAllocator.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/PartialPooledByteBufAllocator.java
index 12f5568..17f601a 100644
--- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/PartialPooledByteBufAllocator.java
+++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/PartialPooledByteBufAllocator.java
@@ -37,6 +37,11 @@ public class PartialPooledByteBufAllocator implements ByteBufAllocator {
    }
 
    @Override
+   public int calculateNewCapacity(int minNewCapacity, int maxCapacity) {
+      return POOLED.calculateNewCapacity(minNewCapacity, maxCapacity);
+   }
+
+   @Override
    public ByteBuf buffer() {
       return UNPOOLED.heapBuffer();
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableDelivery.java
----------------------------------------------------------------------
diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableDelivery.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableDelivery.java
index dea8602..d9bddcb 100644
--- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableDelivery.java
+++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableDelivery.java
@@ -51,6 +51,12 @@ public class UnmodifiableDelivery implements Delivery {
       }
    }
 
+   /* waiting Pull Request sent
+   @Override
+   public int getDataLength() {
+      return delivery.getDataLength();
+   } */
+
    @Override
    public DeliveryState getLocalState() {
       return delivery.getLocalState();

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/benchmarks/journal/gcfree/EncodersBench.java
----------------------------------------------------------------------
diff --git a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/benchmarks/journal/gcfree/EncodersBench.java b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/benchmarks/journal/gcfree/EncodersBench.java
index 833302d..162a512 100644
--- a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/benchmarks/journal/gcfree/EncodersBench.java
+++ b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/benchmarks/journal/gcfree/EncodersBench.java
@@ -23,6 +23,7 @@ import java.util.concurrent.TimeUnit;
 import io.netty.buffer.Unpooled;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper;
+import org.apache.activemq.artemis.core.journal.EncoderPersister;
 import org.apache.activemq.artemis.core.journal.EncodingSupport;
 import org.apache.activemq.artemis.core.journal.impl.dataformat.JournalAddRecord;
 import org.apache.activemq.artemis.core.journal.impl.dataformat.JournalInternalRecord;
@@ -61,7 +62,7 @@ public class EncodersBench {
       this.byteBuffer.order(ByteOrder.nativeOrder());
       this.addJournalRecordEncoder = new AddJournalRecordEncoder();
 
-      this.record = new JournalAddRecord(true, 1, (byte) 1, ZeroEncodingSupport.Instance);
+      this.record = new JournalAddRecord(true, 1, (byte) 1, EncoderPersister.getInstance(), ZeroEncodingSupport.Instance);
       this.record.setFileID(1);
       this.record.setCompactCount((short) 1);
       this.outBuffer = new ChannelBufferWrapper(Unpooled.directBuffer(this.record.getEncodeSize(), this.record.getEncodeSize()).order(ByteOrder.nativeOrder()));
@@ -86,7 +87,7 @@ public class EncodersBench {
    @Benchmark
    public int encodeUnalignedWithGarbage() {
       outBuffer.clear();
-      final JournalAddRecord addRecord = new JournalAddRecord(true, 1, (byte) 1, ZeroEncodingSupport.Instance);
+      final JournalAddRecord addRecord = new JournalAddRecord(true, 1, (byte) 1, EncoderPersister.getInstance(), ZeroEncodingSupport.Instance);
       addRecord.setFileID(1);
       addRecord.setCompactCount((short) 1);
       addRecord.encode(outBuffer);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/JMSBridgeReconnectionTest.java
----------------------------------------------------------------------
diff --git a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/JMSBridgeReconnectionTest.java b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/JMSBridgeReconnectionTest.java
index ef71e89..03e2ddc 100644
--- a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/JMSBridgeReconnectionTest.java
+++ b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/JMSBridgeReconnectionTest.java
@@ -19,8 +19,8 @@ package org.apache.activemq.artemis.tests.extras.byteman;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.core.client.impl.ClientProducerCredits;
-import org.apache.activemq.artemis.core.message.impl.MessageInternal;
 import org.apache.activemq.artemis.core.protocol.core.Packet;
 import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
 import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendMessage;
@@ -111,7 +111,7 @@ public class JMSBridgeReconnectionTest extends BridgeTestBase {
    static int count = 20;
    static CountDownLatch stopLatch = new CountDownLatch(1);
 
-   public static void pause2(MessageInternal msgI, boolean sendBlocking, final ClientProducerCredits theCredits) {
+   public static void pause2(Message msgI, boolean sendBlocking, final ClientProducerCredits theCredits) {
       if (msgI.containsProperty("__AMQ_CID")) {
          count--;
       }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/MessageCopyTest.java
----------------------------------------------------------------------
diff --git a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/MessageCopyTest.java b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/MessageCopyTest.java
deleted file mode 100644
index 1ff58cd..0000000
--- a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/MessageCopyTest.java
+++ /dev/null
@@ -1,163 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.tests.extras.byteman;
-
-import java.util.ArrayList;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
-import org.apache.activemq.artemis.api.core.SimpleString;
-import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendMessage;
-import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl;
-import org.apache.activemq.artemis.utils.RandomUtil;
-import org.jboss.byteman.contrib.bmunit.BMRule;
-import org.jboss.byteman.contrib.bmunit.BMRules;
-import org.jboss.byteman.contrib.bmunit.BMUnitRunner;
-import org.junit.Assert;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-
-@RunWith(BMUnitRunner.class)
-public class MessageCopyTest {
-
-   @Test
-   @BMRules(
-
-      rules = {@BMRule(
-         name = "message-copy0",
-         targetClass = "org.apache.activemq.artemis.core.server.impl.ServerMessageImpl",
-         targetMethod = "copy()",
-         targetLocation = "ENTRY",
-         action = "System.out.println(\"copy\"), waitFor(\"encode-done\")"), @BMRule(
-         name = "message-copy-done",
-         targetClass = "org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendMessage",
-         targetMethod = "encode(org.apache.activemq.artemis.spi.core.protocol.RemotingConnection)",
-         targetLocation = "EXIT",
-         action = "System.out.println(\"encodeDone\"), signalWake(\"encode-done\", true)"), @BMRule(
-         name = "message-copy1",
-         targetClass = "org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper",
-         targetMethod = "copy(int, int)",
-         condition = "Thread.currentThread().getName().equals(\"T1\")",
-         targetLocation = "EXIT",
-         action = "System.out.println(\"setIndex at \" + Thread.currentThread().getName()), waitFor(\"finish-read\")"), @BMRule(
-         name = "JMSServer.stop wait-init",
-         targetClass = "org.apache.activemq.artemis.tests.extras.byteman.MessageCopyTest",
-         targetMethod = "simulateRead",
-         targetLocation = "EXIT",
-         action = "signalWake(\"finish-read\", true)")})
-   public void testMessageCopyIssue() throws Exception {
-      final long RUNS = 1;
-      final ServerMessageImpl msg = new ServerMessageImpl(123, 18);
-
-      msg.setMessageID(RandomUtil.randomLong());
-      msg.encodeMessageIDToBuffer();
-      msg.setAddress(new SimpleString("Batatantkashf aksjfh aksfjh askfdjh askjfh "));
-
-      final AtomicInteger errors = new AtomicInteger(0);
-
-      int T1_number = 1;
-      int T2_number = 1;
-
-      final CountDownLatch latchAlign = new CountDownLatch(T1_number + T2_number);
-      final CountDownLatch latchReady = new CountDownLatch(1);
-      class T1 extends Thread {
-
-         T1() {
-            super("T1");
-         }
-
-         @Override
-         public void run() {
-            latchAlign.countDown();
-            try {
-               latchReady.await();
-            } catch (Exception ignored) {
-            }
-
-            for (int i = 0; i < RUNS; i++) {
-               try {
-                  ServerMessageImpl newMsg = (ServerMessageImpl) msg.copy();
-               } catch (Throwable e) {
-                  e.printStackTrace();
-                  errors.incrementAndGet();
-               }
-            }
-         }
-      }
-
-      class T2 extends Thread {
-
-         T2() {
-            super("T2");
-         }
-
-         @Override
-         public void run() {
-            latchAlign.countDown();
-            try {
-               latchReady.await();
-            } catch (Exception ignored) {
-            }
-
-            for (int i = 0; i < RUNS; i++) {
-               try {
-                  SessionSendMessage ssm = new SessionSendMessage(msg);
-                  ActiveMQBuffer buf = ssm.encode(null);
-                  System.out.println("reading at buf = " + buf);
-                  simulateRead(buf);
-               } catch (Throwable e) {
-                  e.printStackTrace();
-                  errors.incrementAndGet();
-               }
-            }
-         }
-      }
-
-      ArrayList<Thread> threads = new ArrayList<>();
-
-      for (int i = 0; i < T1_number; i++) {
-         T1 t = new T1();
-         threads.add(t);
-         t.start();
-      }
-
-      for (int i = 0; i < T2_number; i++) {
-         T2 t2 = new T2();
-         threads.add(t2);
-         t2.start();
-      }
-
-      latchAlign.await();
-
-      latchReady.countDown();
-
-      for (Thread t : threads) {
-         t.join();
-      }
-
-      Assert.assertEquals(0, errors.get());
-   }
-
-   private void simulateRead(ActiveMQBuffer buf) {
-      buf.setIndex(buf.capacity() / 2, buf.capacity() / 2);
-
-      // ok this is not actually happening during the read process, but changing this shouldn't affect the buffer on copy
-      buf.writeBytes(new byte[1024]);
-   }
-
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/DuplicateDetectionTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/DuplicateDetectionTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/DuplicateDetectionTest.java
index 8456765..0860e97 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/DuplicateDetectionTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/DuplicateDetectionTest.java
@@ -33,7 +33,7 @@ import org.apache.activemq.artemis.api.core.client.ClientSession;
 import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
 import org.apache.activemq.artemis.api.core.client.ServerLocator;
 import org.apache.activemq.artemis.core.config.Configuration;
-import org.apache.activemq.artemis.core.message.impl.MessageImpl;
+
 import org.apache.activemq.artemis.core.postoffice.impl.PostOfficeImpl;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.core.transaction.impl.XidImpl;
@@ -191,13 +191,13 @@ public class DuplicateDetectionTest extends ActiveMQTestBase {
          Assert.assertNull(message2);
 
          message = createMessage(session, 3);
-         message.putBytesProperty(MessageImpl.HDR_BRIDGE_DUPLICATE_ID, dupID.getData());
+         message.putBytesProperty(Message.HDR_BRIDGE_DUPLICATE_ID, dupID.getData());
          producer.send(message);
          message2 = consumer.receive(1000);
          Assert.assertEquals(3, message2.getObjectProperty(propKey));
 
          message = createMessage(session, 4);
-         message.putBytesProperty(MessageImpl.HDR_BRIDGE_DUPLICATE_ID, dupID.getData());
+         message.putBytesProperty(Message.HDR_BRIDGE_DUPLICATE_ID, dupID.getData());
          producer.send(message);
          message2 = consumer.receiveImmediate();
          Assert.assertNull(message2);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpDescribedTypePayloadTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpDescribedTypePayloadTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpDescribedTypePayloadTest.java
index bbb9c26..138f3cc 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpDescribedTypePayloadTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpDescribedTypePayloadTest.java
@@ -24,6 +24,7 @@ import javax.jms.Destination;
 import javax.jms.Message;
 import javax.jms.MessageConsumer;
 import javax.jms.MessageProducer;
+import javax.jms.ObjectMessage;
 import javax.jms.Session;
 
 import org.apache.activemq.ActiveMQConnectionFactory;
@@ -35,6 +36,7 @@ import org.apache.activemq.transport.amqp.client.AmqpNoLocalFilter;
 import org.apache.activemq.transport.amqp.client.AmqpReceiver;
 import org.apache.activemq.transport.amqp.client.AmqpSender;
 import org.apache.activemq.transport.amqp.client.AmqpSession;
+import org.apache.qpid.jms.JmsConnectionFactory;
 import org.junit.Test;
 
 /**
@@ -119,7 +121,7 @@ public class AmqpDescribedTypePayloadTest extends AmqpClientTestSupport {
       assertEquals(1, queue.getMessageCount());
 
       // Receive and resend with OpenWire JMS client
-      ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
+      JmsConnectionFactory factory = new JmsConnectionFactory("amqp://localhost:61616");
       Connection jmsConnection = factory.createConnection();
       try {
          Session jmsSession = jmsConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
@@ -129,7 +131,7 @@ public class AmqpDescribedTypePayloadTest extends AmqpClientTestSupport {
 
          Message received = jmsConsumer.receive(5000);
          assertNotNull(received);
-         assertTrue(received instanceof BytesMessage);
+         assertTrue(received instanceof ObjectMessage);
 
          MessageProducer jmsProducer = jmsSession.createProducer(destination);
          jmsProducer.send(received);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java
index 0f006bc..70ff658 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java
@@ -243,27 +243,6 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
       assertEquals(2, server.locateQueue(SimpleString.toSimpleString(queueC)).getMessageCount() + server.locateQueue(SimpleString.toSimpleString(queueB)).getMessageCount());
    }
 
-   @Test
-   public void testAmbiguousMessageRouting() throws Exception {
-      final String addressA = "addressA";
-      final String queueA = "queueA";
-      final String queueB = "queueB";
-      final String queueC = "queueC";
-      final String queueD = "queueD";
-
-      ActiveMQServerControl serverControl = server.getActiveMQServerControl();
-      serverControl.createAddress(addressA, RoutingType.ANYCAST.toString() + "," + RoutingType.MULTICAST.toString());
-      serverControl.createQueue(addressA, queueA, RoutingType.ANYCAST.toString());
-      serverControl.createQueue(addressA, queueB, RoutingType.ANYCAST.toString());
-      serverControl.createQueue(addressA, queueC, RoutingType.MULTICAST.toString());
-      serverControl.createQueue(addressA, queueD, RoutingType.MULTICAST.toString());
-
-      sendMessages(addressA, 1);
-
-      assertEquals(1, server.locateQueue(SimpleString.toSimpleString(queueA)).getMessageCount() + server.locateQueue(SimpleString.toSimpleString(queueB)).getMessageCount());
-      assertEquals(2, server.locateQueue(SimpleString.toSimpleString(queueC)).getMessageCount() + server.locateQueue(SimpleString.toSimpleString(queueD)).getMessageCount());
-   }
-
    @Test(timeout = 60000)
    public void testMessageDurableFalse() throws Exception {
       sendMessages(getTestName(), 1, false);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTest.java
index 7962005..39daee4 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTest.java
@@ -16,28 +16,6 @@
  */
 package org.apache.activemq.artemis.tests.integration.amqp;
 
-import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.DELAYED_DELIVERY;
-import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.PRODUCT;
-import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.VERSION;
-import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.contains;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.lang.reflect.Field;
-import java.net.URI;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Enumeration;
-import java.util.LinkedHashMap;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Random;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
 import javax.jms.BytesMessage;
 import javax.jms.Connection;
 import javax.jms.ConnectionFactory;
@@ -61,7 +39,24 @@ import javax.jms.Topic;
 import javax.jms.TopicPublisher;
 import javax.jms.TopicSession;
 import javax.jms.TopicSubscriber;
+import java.io.IOException;
+import java.io.Serializable;
+import java.lang.reflect.Field;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Enumeration;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Random;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 
+import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.api.core.management.ResourceNames;
 import org.apache.activemq.artemis.core.config.Configuration;
@@ -71,14 +66,13 @@ import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnector;
 import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.core.server.Queue;
-import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.core.server.impl.AddressInfo;
 import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
 import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.protocol.amqp.broker.ProtonProtocolManagerFactory;
 import org.apache.activemq.artemis.protocol.amqp.client.AMQPClientConnectionFactory;
 import org.apache.activemq.artemis.protocol.amqp.client.ProtonClientConnectionManager;
 import org.apache.activemq.artemis.protocol.amqp.client.ProtonClientProtocolManager;
-import org.apache.activemq.artemis.protocol.amqp.broker.ProtonProtocolManagerFactory;
 import org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport;
 import org.apache.activemq.artemis.protocol.amqp.proton.ProtonServerReceiverContext;
 import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
@@ -105,6 +99,11 @@ import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
+import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.DELAYED_DELIVERY;
+import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.PRODUCT;
+import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.VERSION;
+import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.contains;
+
 @RunWith(Parameterized.class)
 public class ProtonTest extends ProtonTestBase {
 
@@ -224,6 +223,7 @@ public class ProtonTest extends ProtonTestBase {
 
          TextMessage message = session.createTextMessage("test-message");
          producer.send(message);
+
          producer.close();
 
          connection.start();
@@ -378,7 +378,7 @@ public class ProtonTest extends ProtonTestBase {
          receiver.flow(1);
 
          // Shouldn't get this since we delayed the message.
-         assertNull(receiver.receive(5, TimeUnit.SECONDS));
+         assertNull(receiver.receive(1, TimeUnit.SECONDS));
       } finally {
          connection.close();
       }
@@ -827,12 +827,7 @@ public class ProtonTest extends ProtonTestBase {
          AmqpReceiver receiver = session.createReceiver(coreAddress);
          server.destroyQueue(new SimpleString(coreAddress), null, false, true);
 
-         Wait.waitFor(new Wait.Condition() {
-            @Override
-            public boolean isSatisfied() throws Exception {
-               return receiver.isClosed();
-            }
-         });
+         Wait.waitFor(receiver::isClosed);
          assertTrue(receiver.isClosed());
       } finally {
          amqpConnection.close();
@@ -851,12 +846,7 @@ public class ProtonTest extends ProtonTestBase {
             connection.disconnect(true);
          }
 
-         Wait.waitFor(new Wait.Condition() {
-            @Override
-            public boolean isSatisfied() throws Exception {
-               return amqpConnection.isClosed();
-            }
-         });
+         Wait.waitFor(amqpConnection::isClosed);
 
          assertTrue(amqpConnection.isClosed());
          assertEquals(AmqpSupport.CONNECTION_FORCED, amqpConnection.getConnection().getRemoteCondition().getCondition());
@@ -937,7 +927,7 @@ public class ProtonTest extends ProtonTestBase {
          request.setText("[]");
 
          sender.send(request);
-         AmqpMessage response = receiver.receive(50, TimeUnit.SECONDS);
+         AmqpMessage response = receiver.receive(5, TimeUnit.SECONDS);
          Assert.assertNotNull(response);
          assertNotNull(response);
          Object section = response.getWrappedMessage().getBody();
@@ -1001,12 +991,7 @@ public class ProtonTest extends ProtonTestBase {
       final ActiveMQServer remote = createAMQPServer(5673);
       remote.start();
       try {
-         Wait.waitFor(new Wait.Condition() {
-            @Override
-            public boolean isSatisfied() throws Exception {
-               return remote.isActive();
-            }
-         });
+         Wait.waitFor(remote::isActive);
       } catch (Exception e) {
          remote.stop();
          throw e;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AckBatchSizeTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AckBatchSizeTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AckBatchSizeTest.java
index beac414..847b69e 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AckBatchSizeTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AckBatchSizeTest.java
@@ -63,7 +63,8 @@ public class AckBatchSizeTest extends ActiveMQTestBase {
       ActiveMQServer server = createServer(false);
       server.start();
       int numMessages = 100;
-      ServerLocator locator = createInVMNonHALocator().setAckBatchSize(numMessages * getMessageEncodeSize(addressA)).setBlockOnAcknowledge(true);
+      int originalSize = getMessageEncodeSize(addressA);
+      ServerLocator locator = createInVMNonHALocator().setAckBatchSize(numMessages * originalSize).setBlockOnAcknowledge(true);
       ClientSessionFactory cf = createSessionFactory(locator);
       ClientSession sendSession = cf.createSession(false, true, true);
 
@@ -71,20 +72,25 @@ public class AckBatchSizeTest extends ActiveMQTestBase {
       session.createQueue(addressA, queueA, false);
       ClientProducer cp = sendSession.createProducer(addressA);
       for (int i = 0; i < numMessages; i++) {
-         cp.send(sendSession.createMessage(false));
+         ClientMessage message = (ClientMessage)sendSession.createMessage(false).setAddress(addressA);
+         Assert.assertEquals(originalSize, message.getEncodeSize());
+         cp.send(message);
+         Assert.assertEquals(originalSize, message.getEncodeSize());
       }
 
       ClientConsumer consumer = session.createConsumer(queueA);
       session.start();
       for (int i = 0; i < numMessages - 1; i++) {
+         System.out.println("Receive ");
          ClientMessage m = consumer.receive(5000);
-
+         Assert.assertEquals(0, m.getPropertyNames().size());
+         Assert.assertEquals("expected to be " + originalSize, originalSize, m.getEncodeSize());
          m.acknowledge();
       }
 
       ClientMessage m = consumer.receive(5000);
       Queue q = (Queue) server.getPostOffice().getBinding(queueA).getBindable();
-      Assert.assertEquals(100, q.getDeliveringCount());
+      Assert.assertEquals(numMessages, q.getDeliveringCount());
       m.acknowledge();
       Assert.assertEquals(0, q.getDeliveringCount());
       sendSession.close();

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AcknowledgeTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AcknowledgeTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AcknowledgeTest.java
index 0597dd5..d93807f 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AcknowledgeTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AcknowledgeTest.java
@@ -21,10 +21,14 @@ import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
+import io.netty.buffer.ByteBuf;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.ActiveMQPropertyConversionException;
+import org.apache.activemq.artemis.api.core.ICoreMessage;
 import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.api.core.RefCountMessage;
+import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.api.core.client.ClientConsumer;
 import org.apache.activemq.artemis.api.core.client.ClientMessage;
@@ -34,6 +38,7 @@ import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
 import org.apache.activemq.artemis.api.core.client.MessageHandler;
 import org.apache.activemq.artemis.api.core.client.ServerLocator;
 import org.apache.activemq.artemis.core.client.impl.ClientSessionInternal;
+import org.apache.activemq.artemis.core.persistence.Persister;
 import org.apache.activemq.artemis.core.protocol.core.impl.ActiveMQConsumerContext;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.core.server.Queue;
@@ -332,10 +337,133 @@ public class AcknowledgeTest extends ActiveMQTestBase {
       }
    }
 
-   class FakeMessageWithID implements Message {
+   class FakeMessageWithID extends RefCountMessage {
 
       final long id;
 
+      @Override
+      public RoutingType getRouteType() {
+         return null;
+      }
+
+      @Override
+      public SimpleString getReplyTo() {
+         return null;
+      }
+
+      @Override
+      public Message setReplyTo(SimpleString address) {
+         return null;
+      }
+
+      @Override
+      public boolean containsDeliveryAnnotationProperty(SimpleString property) {
+         return false;
+      }
+
+      @Override
+      public Object removeDeliveryAnnoationProperty(SimpleString key) {
+         return null;
+      }
+
+      @Override
+      public Object getDeliveryAnnotationProperty(SimpleString key) {
+         return null;
+      }
+
+      @Override
+      public int getPersistSize() {
+         return 0;
+      }
+
+      @Override
+      public void persist(ActiveMQBuffer targetRecord) {
+      }
+
+      @Override
+      public Persister<Message> getPersister() {
+         return null;
+      }
+
+      @Override
+      public void reloadPersistence(ActiveMQBuffer record) {
+
+      }
+
+      @Override
+      public Long getScheduledDeliveryTime() {
+         return null;
+      }
+
+      @Override
+      public ICoreMessage toCore() {
+         return null;
+      }
+
+      @Override
+      public void receiveBuffer(ByteBuf buffer) {
+
+      }
+
+      @Override
+      public void sendBuffer(ByteBuf buffer, int count) {
+
+      }
+      @Override
+      public Message setUserID(Object userID) {
+         return null;
+      }
+
+      @Override
+      public void messageChanged() {
+
+      }
+
+      @Override
+      public Message copy() {
+         return null;
+      }
+
+      @Override
+      public Message copy(long newID) {
+         return null;
+      }
+
+      @Override
+      public Message setMessageID(long id) {
+         return null;
+      }
+
+      @Override
+      public int getRefCount() {
+         return 0;
+      }
+
+      @Override
+      public int incrementRefCount() throws Exception {
+         return 0;
+      }
+
+      @Override
+      public int decrementRefCount() throws Exception {
+         return 0;
+      }
+
+      @Override
+      public int incrementDurableRefCount() {
+         return 0;
+      }
+
+      @Override
+      public int decrementDurableRefCount() {
+         return 0;
+      }
+
+      @Override
+      public int getMemoryEstimate() {
+         return 0;
+      }
+
       FakeMessageWithID(final long id) {
          this.id = id;
       }
@@ -351,23 +479,33 @@ public class AcknowledgeTest extends ActiveMQTestBase {
       }
 
       @Override
-      public FakeMessageWithID setUserID(UUID userID) {
-         return this;
+      public String getAddress() {
+         return null;
       }
 
       @Override
-      public SimpleString getAddress() {
+      public SimpleString getAddressSimpleString() {
          return null;
       }
 
       @Override
-      public Message setAddress(SimpleString address) {
+      public Message setBuffer(ByteBuf buffer) {
          return null;
       }
 
       @Override
-      public byte getType() {
-         return 0;
+      public ByteBuf getBuffer() {
+         return null;
+      }
+
+      @Override
+      public Message setAddress(String address) {
+         return null;
+      }
+
+      @Override
+      public Message setAddress(SimpleString address) {
+         return null;
       }
 
       @Override
@@ -426,16 +564,6 @@ public class AcknowledgeTest extends ActiveMQTestBase {
       }
 
       @Override
-      public ActiveMQBuffer getBodyBuffer() {
-         return null;
-      }
-
-      @Override
-      public ActiveMQBuffer getBodyBufferDuplicate() {
-         return null;
-      }
-
-      @Override
       public Message putBooleanProperty(SimpleString key, boolean value) {
          return null;
       }
@@ -689,15 +817,5 @@ public class AcknowledgeTest extends ActiveMQTestBase {
       public Map<String, Object> toPropertyMap() {
          return null;
       }
-
-      @Override
-      public FakeMessageWithID writeBodyBufferBytes(byte[] bytes) {
-         return this;
-      }
-
-      @Override
-      public FakeMessageWithID writeBodyBufferString(String string) {
-         return this;
-      }
    }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerTest.java
index 8f00b2a..b957291 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerTest.java
@@ -16,6 +16,18 @@
  */
 package org.apache.activemq.artemis.tests.integration.client;
 
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.DeliveryMode;
+import javax.jms.MapMessage;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.ObjectMessage;
+import javax.jms.Session;
+import javax.jms.StreamMessage;
+import javax.jms.TextMessage;
+import java.io.Serializable;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Set;
@@ -27,6 +39,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.ActiveMQIllegalStateException;
 import org.apache.activemq.artemis.api.core.Interceptor;
 import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.api.core.client.ClientConsumer;
 import org.apache.activemq.artemis.api.core.client.ClientMessage;
@@ -41,10 +54,13 @@ import org.apache.activemq.artemis.core.protocol.core.Packet;
 import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
 import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
 import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
 import org.apache.activemq.artemis.tests.util.Wait;
+import org.apache.activemq.artemis.utils.ByteUtil;
 import org.apache.activemq.artemis.utils.ConcurrentHashSet;
+import org.apache.qpid.jms.JmsConnectionFactory;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
@@ -54,15 +70,17 @@ import org.junit.runners.Parameterized;
 @RunWith(value = Parameterized.class)
 public class ConsumerTest extends ActiveMQTestBase {
 
-   @Parameterized.Parameters(name = "isNetty={0}")
+   @Parameterized.Parameters(name = "isNetty={0}, persistent={1}")
    public static Collection getParameters() {
-      return Arrays.asList(new Object[][]{{true}, {false}});
+      return Arrays.asList(new Object[][]{{true, true}, {false, false}, {false, true}, {true, false}});
    }
 
-   public ConsumerTest(boolean netty) {
+   public ConsumerTest(boolean netty, boolean durable) {
       this.netty = netty;
+      this.durable = durable;
    }
 
+   private final boolean durable;
    private final boolean netty;
    private ActiveMQServer server;
 
@@ -79,13 +97,31 @@ public class ConsumerTest extends ActiveMQTestBase {
    public void setUp() throws Exception {
       super.setUp();
 
-      server = createServer(false, isNetty());
+      server = createServer(durable, isNetty());
 
       server.start();
 
       locator = createFactory(isNetty());
    }
 
+   @Before
+   public void createQueue() throws Exception {
+
+      ServerLocator locator = createFactory(isNetty());
+
+      ClientSessionFactory sf = createSessionFactory(locator);
+
+      ClientSession session = sf.createSession(false, true, true, true);
+
+      server.createQueue(QUEUE, RoutingType.ANYCAST, QUEUE, null, true, false);
+
+      session.close();
+
+      sf.close();
+
+      locator.close();
+   }
+
    @Test
    public void testStressConnection() throws Exception {
 
@@ -113,34 +149,220 @@ public class ConsumerTest extends ActiveMQTestBase {
 
       ClientSession session = sf.createSession(false, true, true, false);
 
-      session.createQueue(QUEUE, QUEUE, null, false);
-
-      ClientConsumer consumer = session.createConsumer(QUEUE);
-
       ClientProducer producer = session.createProducer(QUEUE);
       ClientMessage message = session.createMessage(Message.TEXT_TYPE, true, 0, System.currentTimeMillis(), (byte) 4);
       message.getBodyBuffer().writeString("hi");
       message.putStringProperty("hello", "elo");
       producer.send(message);
 
+      session.commit();
+
+      session.close();
+      if (durable) {
+         server.stop();
+         server.start();
+      }
+      sf = createSessionFactory(locator);
+      session = sf.createSession(false, true, true, false);
+      ClientConsumer consumer = session.createConsumer(QUEUE);
+
       session.start();
 
       if (cancelOnce) {
-         final ClientConsumerInternal consumerInternal = (ClientConsumerInternal)consumer;
+         final ClientConsumerInternal consumerInternal = (ClientConsumerInternal) consumer;
          Wait.waitFor(() -> consumerInternal.getBufferSize() > 0);
          consumer.close();
          consumer = session.createConsumer(QUEUE);
       }
       ClientMessage message2 = consumer.receive(1000);
 
+      Assert.assertNotNull(message2);
+
       System.out.println("Id::" + message2.getMessageID());
 
       System.out.println("Received " + message2);
 
+      System.out.println("Clie:" + ByteUtil.bytesToHex(message2.getBuffer().array(), 4));
+
+      System.out.println("String::" + message2.getReadOnlyBodyBuffer().readString());
+
+      Assert.assertEquals("elo", message2.getStringProperty("hello"));
+
+      Assert.assertEquals("hi", message2.getReadOnlyBodyBuffer().readString());
+
       session.close();
    }
 
+   @Test
+   public void testSendReceiveAMQP() throws Throwable {
+
+      if (!isNetty()) {
+         // no need to run the test, there's no AMQP support
+         return;
+      }
+
+      internalSend(true, true);
+   }
+
+   @Test
+   public void testSendReceiveCore() throws Throwable {
+
+      if (!isNetty()) {
+         // no need to run the test, there's no AMQP support
+         return;
+      }
+
+      internalSend(false, false);
+   }
+
+   @Test
+   public void testSendAMQPReceiveCore() throws Throwable {
+
+      if (!isNetty()) {
+         // no need to run the test, there's no AMQP support
+         return;
+      }
+
+      internalSend(true, false);
+   }
+
+   @Test
+   public void testSendCoreReceiveAMQP() throws Throwable {
+
+      if (!isNetty()) {
+         // no need to run the test, there's no AMQP support
+         return;
+      }
+
+      internalSend(false, true);
+   }
+
+
+
+   public static class MyTest implements Serializable {
+      int i;
+
+      public int getI() {
+         return i;
+      }
+
+      public MyTest setI(int i) {
+         this.i = i;
+         return this;
+      }
+   }
+
+
+   public void internalSend(boolean amqpSender, boolean amqpConsumer) throws Throwable {
+
+      ConnectionFactory factoryAMQP = new JmsConnectionFactory("amqp://localhost:61616");
+      ConnectionFactory factoryCore = new ActiveMQConnectionFactory();
+
+
+      Connection connection = (amqpSender ? factoryAMQP : factoryCore).createConnection();
+
+      try {
+         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         javax.jms.Queue queue = session.createQueue(QUEUE.toString());
+         MessageProducer producer = session.createProducer(queue);
+         producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+
+         long time = System.currentTimeMillis();
+         int NUMBER_OF_MESSAGES = 100;
+         for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
+            TextMessage msg = session.createTextMessage("hello " + i);
+            msg.setIntProperty("mycount", i);
+            producer.send(msg);
+
+            ObjectMessage objectMessage = session.createObjectMessage(new MyTest().setI(i));
+            producer.send(objectMessage);
+
+            MapMessage mapMessage = session.createMapMessage();
+            mapMessage.setInt("intOne", i);
+            mapMessage.setString("stringOne", Integer.toString(i));
+            producer.send(mapMessage);
+
+            StreamMessage stream = session.createStreamMessage();
+            stream.writeBoolean(true);
+            stream.writeInt(i);
+            producer.send(stream);
+
+            BytesMessage bytes = session.createBytesMessage();
+            bytes.writeUTF("string " + i);
+            producer.send(bytes);
+         }
+         long end = System.currentTimeMillis();
+
+         System.out.println("Time = " + (end - time));
+
+         {
+            TextMessage dummyMessage = session.createTextMessage();
+            dummyMessage.setJMSType("car");
+            dummyMessage.setStringProperty("color", "red");
+            dummyMessage.setLongProperty("weight", 3000);
+            dummyMessage.setText("testSelectorExampleFromSpecs:1");
+            producer.send(dummyMessage);
+
+            dummyMessage = session.createTextMessage();
+            dummyMessage.setJMSType("car");
+            dummyMessage.setStringProperty("color", "blue");
+            dummyMessage.setLongProperty("weight", 3000);
+            dummyMessage.setText("testSelectorExampleFromSpecs:2");
+            producer.send(dummyMessage);
+         }
+
+
+
+
+         connection.close();
+
+         if (this.durable) {
+            server.stop();
+            server.start();
+         }
+
+         connection = (amqpConsumer ? factoryAMQP : factoryCore).createConnection();
+         session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         queue = session.createQueue(QUEUE.toString());
+
+         connection.start();
+
+         MessageConsumer consumer = session.createConsumer(queue);
+
+         for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
+            TextMessage message = (TextMessage) consumer.receive(1000);
+            Assert.assertNotNull(message);
+            Assert.assertEquals(i, message.getIntProperty("mycount"));
+            Assert.assertEquals("hello " + i, message.getText());
+
+            ObjectMessage objectMessage = (ObjectMessage)consumer.receive(5000);
+            Assert.assertNotNull(objectMessage);
+            Assert.assertEquals(i, ((MyTest)objectMessage.getObject()).getI());
+
+            MapMessage mapMessage = (MapMessage) consumer.receive(1000);
+            Assert.assertNotNull(mapMessage);
+            Assert.assertEquals(i, mapMessage.getInt("intOne"));
+            Assert.assertEquals(Integer.toString(i), mapMessage.getString("stringOne"));
+
+            StreamMessage stream = (StreamMessage)consumer.receive(5000);
+            Assert.assertTrue(stream.readBoolean());
+            Assert.assertEquals(i, stream.readInt());
+
+            BytesMessage bytes = (BytesMessage) consumer.receive(5000);
+            Assert.assertEquals("string " + i, bytes.readUTF());
+         }
+
+         consumer.close();
+
+         consumer = session.createConsumer(queue, "JMSType = 'car' AND color = 'blue' AND weight > 2500");
+
+         TextMessage msg = (TextMessage) consumer.receive(1000);
+         Assert.assertEquals("testSelectorExampleFromSpecs:2", msg.getText());
 
+      } finally {
+         connection.close();
+      }
+   }
 
    @Test
    public void testConsumerAckImmediateAutoCommitTrue() throws Exception {
@@ -148,8 +370,6 @@ public class ConsumerTest extends ActiveMQTestBase {
 
       ClientSession session = sf.createSession(false, true, true, true);
 
-      session.createQueue(QUEUE, QUEUE, null, false);
-
       ClientProducer producer = session.createProducer(QUEUE);
 
       final int numMessages = 100;
@@ -180,8 +400,6 @@ public class ConsumerTest extends ActiveMQTestBase {
 
       ClientSession session = sf.createSession(false, true, false, true);
 
-      session.createQueue(QUEUE, QUEUE, null, false);
-
       ClientProducer producer = session.createProducer(QUEUE);
 
       final int numMessages = 100;
@@ -212,8 +430,6 @@ public class ConsumerTest extends ActiveMQTestBase {
 
       ClientSession session = sf.createSession(false, true, true, true);
 
-      session.createQueue(QUEUE, QUEUE, null, false);
-
       ClientProducer producer = session.createProducer(QUEUE);
 
       final int numMessages = 100;
@@ -247,8 +463,6 @@ public class ConsumerTest extends ActiveMQTestBase {
 
       ClientSession session = sf.createSession(false, true, true, true);
 
-      session.createQueue(QUEUE, QUEUE, null, false);
-
       ClientProducer producer = session.createProducer(QUEUE);
 
       final int numMessages = 100;
@@ -284,11 +498,9 @@ public class ConsumerTest extends ActiveMQTestBase {
 
       ClientSession session = sf.createSession(false, true, true);
 
-      session.createQueue(QUEUE, QUEUE, null, false);
-
       ClientProducer producer = session.createProducer(QUEUE);
 
-      final int numMessages = 10000;
+      final int numMessages = 100;
 
       for (int i = 0; i < numMessages; i++) {
          ClientMessage message = createTextMessage(session, "m" + i);
@@ -338,8 +550,6 @@ public class ConsumerTest extends ActiveMQTestBase {
 
       ClientSession session = sf.createSession(false, true, true);
 
-      session.createQueue(QUEUE, QUEUE, null, false);
-
       session.start();
 
       ClientProducer producer = session.createProducer(QUEUE);
@@ -372,8 +582,6 @@ public class ConsumerTest extends ActiveMQTestBase {
       ClientSession session = sf.createSession(false, true, true);
       session.start();
 
-      session.createQueue(QUEUE, QUEUE, null, false);
-
       ClientConsumer consumer = session.createConsumer(QUEUE);
 
       consumer.setMessageHandler(new MessageHandler() {
@@ -394,8 +602,6 @@ public class ConsumerTest extends ActiveMQTestBase {
 
       ClientSession session = sf.createSession(false, true, true);
 
-      session.createQueue(QUEUE, QUEUE, null, false);
-
       ClientConsumer consumer = session.createConsumer(QUEUE);
 
       consumer.setMessageHandler(new MessageHandler() {
@@ -436,7 +642,7 @@ public class ConsumerTest extends ActiveMQTestBase {
 
          sessions.add(session);
 
-         session.createQueue(QUEUE, QUEUE.concat("" + i), null, false);
+         session.createQueue(QUEUE, QUEUE.concat("" + i), null, true);
 
          if (i == 0) {
             session.createQueue(QUEUE_RESPONSE, QUEUE_RESPONSE);
@@ -550,8 +756,6 @@ public class ConsumerTest extends ActiveMQTestBase {
 
       ClientSession session = sf.createTransactedSession();
 
-      session.createQueue(QUEUE, QUEUE, null, false);
-
       ClientProducer producer = session.createProducer(QUEUE);
 
       final int numMessages = 100;
@@ -598,7 +802,6 @@ public class ConsumerTest extends ActiveMQTestBase {
       ServerLocator locator = addServerLocator(ServerLocatorImpl.newLocator("vm:/1"));
       ClientSessionFactory factory = locator.createSessionFactory();
       ClientSession session = factory.createSession();
-      session.createQueue(QUEUE, QUEUE);
       ClientProducer producer = session.createProducer(QUEUE);
       producer.send(session.createMessage(true));
 
@@ -620,8 +823,6 @@ public class ConsumerTest extends ActiveMQTestBase {
 
       ClientSession session = sf.createTransactedSession();
 
-      session.createQueue(QUEUE, QUEUE, null, false);
-
       ClientProducer producer = session.createProducer(QUEUE);
 
       final int numMessages = 100;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java
index 81e0ca4..201a96b 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java
@@ -28,6 +28,8 @@ import java.util.concurrent.TimeUnit;
 
 import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.Interceptor;
+
+import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.api.core.client.ClientConsumer;
 import org.apache.activemq.artemis.api.core.client.ClientMessage;
@@ -56,7 +58,6 @@ import org.apache.activemq.artemis.core.server.MessageReference;
 import org.apache.activemq.artemis.core.server.Queue;
 import org.apache.activemq.artemis.core.server.QueueConfig;
 import org.apache.activemq.artemis.core.server.ServerConsumer;
-import org.apache.activemq.artemis.core.server.ServerMessage;
 import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
 import org.apache.activemq.artemis.core.server.impl.QueueFactoryImpl;
 import org.apache.activemq.artemis.core.server.impl.QueueImpl;
@@ -519,7 +520,7 @@ public class HangConsumerTest extends ActiveMQTestBase {
        * @see SessionCallback#sendJmsMessage(org.apache.activemq.artemis.core.server.ServerMessage, long, int)
        */
       @Override
-      public int sendMessage(MessageReference ref, ServerMessage message, ServerConsumer consumer, int deliveryCount) {
+      public int sendMessage(MessageReference ref, Message message, ServerConsumer consumer, int deliveryCount) {
          inCall.countDown();
          try {
             callbackSemaphore.acquire();
@@ -541,7 +542,7 @@ public class HangConsumerTest extends ActiveMQTestBase {
        */
       @Override
       public int sendLargeMessage(MessageReference reference,
-                                  ServerMessage message,
+                                  Message message,
                                   ServerConsumer consumer,
                                   long bodySize,
                                   int deliveryCount) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/InVMNonPersistentMessageBufferTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/InVMNonPersistentMessageBufferTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/InVMNonPersistentMessageBufferTest.java
index 450a361..d35f436 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/InVMNonPersistentMessageBufferTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/InVMNonPersistentMessageBufferTest.java
@@ -16,13 +16,13 @@
  */
 package org.apache.activemq.artemis.tests.integration.client;
 
+import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.client.ClientConsumer;
 import org.apache.activemq.artemis.api.core.client.ClientMessage;
 import org.apache.activemq.artemis.api.core.client.ClientProducer;
 import org.apache.activemq.artemis.api.core.client.ClientSession;
 import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
 import org.apache.activemq.artemis.api.core.client.ServerLocator;
-import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
@@ -130,9 +130,9 @@ public class InVMNonPersistentMessageBufferTest extends ActiveMQTestBase {
 
          message.getBodyBuffer().clear();
 
-         Assert.assertEquals(PacketImpl.PACKET_HEADERS_SIZE + DataConstants.SIZE_INT, message.getBodyBuffer().writerIndex());
+         Assert.assertEquals(DataConstants.SIZE_INT, message.getBodyBuffer().writerIndex());
 
-         Assert.assertEquals(PacketImpl.PACKET_HEADERS_SIZE + DataConstants.SIZE_INT, message.getBodyBuffer().readerIndex());
+         Assert.assertEquals(DataConstants.SIZE_INT, message.getBodyBuffer().readerIndex());
       }
    }
 
@@ -148,6 +148,18 @@ public class InVMNonPersistentMessageBufferTest extends ActiveMQTestBase {
 
       Assert.assertNotNull(received);
 
+      ActiveMQBuffer buffer = received.getReadOnlyBodyBuffer();
+
+      Assert.assertEquals(body, buffer.readString());
+
+      try {
+         buffer.readByte();
+         Assert.fail("Should throw exception");
+      } catch (IndexOutOfBoundsException e) {
+         // OK
+      }
+
+
       Assert.assertEquals(body, received.getBodyBuffer().readString());
 
       try {
@@ -157,6 +169,18 @@ public class InVMNonPersistentMessageBufferTest extends ActiveMQTestBase {
       } catch (IndexOutOfBoundsException e) {
          // OK
       }
+
+      buffer = received.getReadOnlyBodyBuffer();
+
+      Assert.assertEquals(body, buffer.readString());
+
+      try {
+         buffer.readByte();
+         Assert.fail("Should throw exception");
+      } catch (IndexOutOfBoundsException e) {
+         // OK
+      }
+
    }
 
    @Test
@@ -167,7 +191,7 @@ public class InVMNonPersistentMessageBufferTest extends ActiveMQTestBase {
 
       message.getBodyBuffer().writeString(body);
 
-      Assert.assertEquals(PacketImpl.PACKET_HEADERS_SIZE + DataConstants.SIZE_INT, message.getBodyBuffer().readerIndex());
+      Assert.assertEquals(DataConstants.SIZE_INT, message.getBodyBuffer().readerIndex());
 
       String body2 = message.getBodyBuffer().readString();
 
@@ -175,7 +199,7 @@ public class InVMNonPersistentMessageBufferTest extends ActiveMQTestBase {
 
       message.getBodyBuffer().resetReaderIndex();
 
-      Assert.assertEquals(PacketImpl.PACKET_HEADERS_SIZE + DataConstants.SIZE_INT, message.getBodyBuffer().readerIndex());
+      Assert.assertEquals(DataConstants.SIZE_INT, message.getBodyBuffer().readerIndex());
 
       String body3 = message.getBodyBuffer().readString();
 
@@ -189,7 +213,7 @@ public class InVMNonPersistentMessageBufferTest extends ActiveMQTestBase {
 
       received.getBodyBuffer().resetReaderIndex();
 
-      Assert.assertEquals(PacketImpl.PACKET_HEADERS_SIZE + DataConstants.SIZE_INT, received.getBodyBuffer().readerIndex());
+      Assert.assertEquals(DataConstants.SIZE_INT, received.getBodyBuffer().readerIndex());
 
       String body4 = received.getBodyBuffer().readString();
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/InterruptedLargeMessageTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/InterruptedLargeMessageTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/InterruptedLargeMessageTest.java
index 1950e12..540baf6 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/InterruptedLargeMessageTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/InterruptedLargeMessageTest.java
@@ -53,10 +53,8 @@ import org.apache.activemq.artemis.core.server.Queue;
 import org.apache.activemq.artemis.core.server.QueueConfig;
 import org.apache.activemq.artemis.core.server.QueueFactory;
 import org.apache.activemq.artemis.api.core.RoutingType;
-import org.apache.activemq.artemis.core.server.ServerSession;
 import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
 import org.apache.activemq.artemis.core.server.impl.QueueImpl;
-import org.apache.activemq.artemis.core.server.impl.ServerSessionImpl;
 import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
 import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
 import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
@@ -125,10 +123,10 @@ public class InterruptedLargeMessageTest extends LargeMessageTestBase {
       producer.send(clientFile);
 
       Thread.sleep(500);
-
-      for (ServerSession srvSession : server.getSessions()) {
-         ((ServerSessionImpl) srvSession).clearLargeMessage();
-      }
+//
+//      for (ServerSession srvSession : server.getSessions()) {
+//         ((ServerSessionImpl) srvSession).clearLargeMessage();
+//      }
 
       server.stop(false);
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageTest.java
index 3577a87..025d00a 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageTest.java
@@ -40,7 +40,6 @@ import org.apache.activemq.artemis.api.core.client.ServerLocator;
 import org.apache.activemq.artemis.core.client.impl.ClientConsumerInternal;
 import org.apache.activemq.artemis.core.config.Configuration;
 import org.apache.activemq.artemis.core.config.StoreConfiguration;
-import org.apache.activemq.artemis.core.message.impl.MessageImpl;
 import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager;
 import org.apache.activemq.artemis.core.persistence.impl.journal.LargeServerMessageImpl;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
@@ -350,7 +349,7 @@ public class LargeMessageTest extends LargeMessageTestBase {
 
       ClientProducer producer = session.createProducer(ADDRESS);
 
-      Message clientFile = session.createMessage(true);
+      ClientMessage clientFile = session.createMessage(true);
       for (int i = 0; i < messageSize; i++) {
          clientFile.getBodyBuffer().writeByte(getSamplebyte(i));
       }
@@ -890,7 +889,7 @@ public class LargeMessageTest extends LargeMessageTestBase {
             Message clientFile = createLargeClientMessageStreaming(session, messageSize, true);
 
             if (isSimulateBridge) {
-               clientFile.putBytesProperty(MessageImpl.HDR_BRIDGE_DUPLICATE_ID, someDuplicateInfo.getBytes());
+               clientFile.putBytesProperty(Message.HDR_BRIDGE_DUPLICATE_ID, someDuplicateInfo.getBytes());
             } else {
                clientFile.putBytesProperty(Message.HDR_DUPLICATE_DETECTION_ID, someDuplicateInfo.getBytes());
             }


Mime
View raw message