Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 719B4200C3A for ; Thu, 2 Mar 2017 16:05:39 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 70105160B61; Thu, 2 Mar 2017 15:05:39 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id E4631160B86 for ; Thu, 2 Mar 2017 16:05:36 +0100 (CET) Received: (qmail 56197 invoked by uid 500); 2 Mar 2017 15:05:36 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 55866 invoked by uid 99); 2 Mar 2017 15:05:35 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 02 Mar 2017 15:05:35 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 73B83F0BE4; Thu, 2 Mar 2017 15:05:35 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: clebertsuconic@apache.org To: commits@activemq.apache.org Date: Thu, 02 Mar 2017 15:05:46 -0000 Message-Id: <8d07027140d345d89850668cc84b9ec0@git.apache.org> In-Reply-To: <549cadbf627048ab90dbf12bb24d5fcc@git.apache.org> References: <549cadbf627048ab90dbf12bb24d5fcc@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [12/29] activemq-artemis git commit: ARTEMIS-1009 Pure Message Encoding. archived-at: Thu, 02 Mar 2017 15:05:39 -0000 http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/46be6a2f/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java index 622f042..4da2e63 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java @@ -16,7 +16,6 @@ */ package org.apache.activemq.artemis.core.server.impl; -import java.io.InputStream; import java.util.Collection; import java.util.HashSet; import java.util.LinkedList; @@ -31,27 +30,30 @@ import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import io.netty.buffer.ByteBuf; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQPropertyConversionException; import org.apache.activemq.artemis.api.core.Message; + +import org.apache.activemq.artemis.api.core.RefCountMessage; import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.api.core.encode.BodyType; import org.apache.activemq.artemis.core.filter.Filter; -import org.apache.activemq.artemis.core.message.BodyEncoder; -import org.apache.activemq.artemis.core.paging.PagingStore; +import org.apache.activemq.artemis.core.message.LargeBodyEncoder; import org.apache.activemq.artemis.core.paging.cursor.PageSubscription; +import org.apache.activemq.artemis.core.persistence.Persister; import org.apache.activemq.artemis.core.server.Consumer; import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.RoutingContext; import org.apache.activemq.artemis.api.core.RoutingType; -import org.apache.activemq.artemis.core.server.ServerMessage; + import org.apache.activemq.artemis.core.transaction.Transaction; import org.apache.activemq.artemis.utils.ActiveMQThreadFactory; import org.apache.activemq.artemis.utils.LinkedListIterator; import org.apache.activemq.artemis.utils.RandomUtil; import org.apache.activemq.artemis.utils.ReferenceCounter; -import org.apache.activemq.artemis.utils.TypedProperties; import org.apache.activemq.artemis.utils.UUID; import org.junit.Assert; import org.junit.Test; @@ -283,110 +285,91 @@ public class ScheduledDeliveryHandlerTest extends Assert { } } - class FakeMessage implements ServerMessage { + class FakeMessage extends RefCountMessage { - final long id; + @Override + public void persist(ActiveMQBuffer targetRecord) { - FakeMessage(final long id) { - this.id = id; } @Override - public FakeMessage setMessageID(long id) { - return this; - } + public void reloadPersistence(ActiveMQBuffer record) { - @Override - public long getMessageID() { - return id; } @Override - public MessageReference createReference(Queue queue) { + public Persister getPersister() { return null; } @Override - public void forceAddress(SimpleString address) { - - } - - @Override - public int incrementRefCount() throws Exception { + public int getPersistSize() { return 0; } + final long id; @Override - public int decrementRefCount() throws Exception { - return 0; + public Message toCore() { + return this; } @Override - public int incrementDurableRefCount() { - return 0; + public ActiveMQBuffer getReadOnlyBodyBuffer() { + return null; } - @Override - public int decrementDurableRefCount() { - return 0; + FakeMessage(final long id) { + this.id = id; } @Override - public ServerMessage copy(long newID) { - return null; + public FakeMessage setMessageID(long id) { + return this; } @Override - public ServerMessage copy() { - return null; + public long getMessageID() { + return id; } @Override - public int getMemoryEstimate() { + public int incrementRefCount() throws Exception { return 0; } @Override - public int getRefCount() { + public int decrementRefCount() throws Exception { return 0; } @Override - public ServerMessage makeCopyForExpiryOrDLA(long newID, - MessageReference originalReference, - boolean expiry, - boolean copyOriginalHeaders) throws Exception { - return null; - } - - @Override - public void setOriginalHeaders(ServerMessage other, MessageReference originalReference, boolean expiry) { - + public int incrementDurableRefCount() { + return 0; } @Override - public void setPagingStore(PagingStore store) { - + public int decrementDurableRefCount() { + return 0; } @Override - public PagingStore getPagingStore() { + public Message copy(long newID) { return null; } @Override - public boolean hasInternalProperties() { - return false; + public Message copy() { + return null; } @Override - public boolean storeIsPaging() { - return false; + public int getMemoryEstimate() { + return 0; } @Override - public void encodeMessageIDToBuffer() { - + public int getRefCount() { + return 0; } @Override @@ -400,97 +383,66 @@ public class ScheduledDeliveryHandlerTest extends Assert { } @Override - public void encode(ActiveMQBuffer buffer) { + public void messageChanged() { } - @Override - public void decode(ActiveMQBuffer buffer) { - - } - - @Override - public void decodeFromBuffer(ActiveMQBuffer buffer) { - - } - - @Override - public int getEndOfMessagePosition() { - return 0; - } - - @Override - public int getEndOfBodyPosition() { - return 0; - } - - @Override - public void bodyChanged() { - - } - - @Override - public boolean isServerMessage() { - return false; - } - - @Override - public ActiveMQBuffer getEncodedBuffer() { + public LargeBodyEncoder getBodyEncoder() throws ActiveMQException { return null; } @Override - public int getHeadersAndPropertiesEncodeSize() { - return 0; + public UUID getUserID() { + return null; } @Override - public ActiveMQBuffer getWholeBuffer() { + public String getAddress() { return null; } @Override - public void encodeHeadersAndProperties(ActiveMQBuffer buffer) { - + public SimpleString getAddressSimpleString() { + return null; } @Override - public void decodeHeadersAndProperties(ActiveMQBuffer buffer) { - + public Message setBuffer(ByteBuf buffer) { + return null; } @Override - public BodyEncoder getBodyEncoder() throws ActiveMQException { + public ByteBuf getBuffer() { return null; } @Override - public InputStream getBodyInputStream() { + public Object getProtocol() { return null; } @Override - public void setAddressTransient(SimpleString address) { - + public Message setProtocol(Object protocol) { + return null; } @Override - public TypedProperties getTypedProperties() { + public Object getBody() { return null; } @Override - public UUID getUserID() { + public BodyType getBodyType() { return null; } @Override - public FakeMessage setUserID(UUID userID) { - return this; + public Message setBody(BodyType type, Object body) { + return null; } @Override - public SimpleString getAddress() { + public Message setAddress(String address) { return null; } @@ -565,11 +517,6 @@ public class ScheduledDeliveryHandlerTest extends Assert { } @Override - public ActiveMQBuffer getBodyBufferDuplicate() { - return null; - } - - @Override public Message putBooleanProperty(SimpleString key, boolean value) { return null; } @@ -825,13 +772,28 @@ public class ScheduledDeliveryHandlerTest extends Assert { } @Override - public FakeMessage writeBodyBufferBytes(byte[] bytes) { - return this; + public Message setUserID(Object userID) { + return null; } @Override - public FakeMessage writeBodyBufferString(String string) { - return this; + public void copyHeadersAndProperties(Message msg) { + + } + + @Override + public Message setType(byte type) { + return null; + } + + @Override + public void receiveBuffer(ByteBuf buffer) { + + } + + @Override + public void sendBuffer(ByteBuf buffer, int count) { + } } @@ -1221,7 +1183,7 @@ public class ScheduledDeliveryHandlerTest extends Assert { } @Override - public boolean hasMatchingConsumer(ServerMessage message) { + public boolean hasMatchingConsumer(Message message) { return false; } @@ -1338,12 +1300,12 @@ public class ScheduledDeliveryHandlerTest extends Assert { } @Override - public void route(ServerMessage message, RoutingContext context) throws Exception { + public void route(Message message, RoutingContext context) throws Exception { } @Override - public void routeWithAck(ServerMessage message, RoutingContext context) { + public void routeWithAck(Message message, RoutingContext context) { } @@ -1366,5 +1328,9 @@ public class ScheduledDeliveryHandlerTest extends Assert { public void decDelivering(int size) { } + + + + } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/46be6a2f/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java index ee80054..b1ea206 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java @@ -26,13 +26,13 @@ import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicInteger; import org.apache.activemq.artemis.api.core.ActiveMQException; +import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.Pair; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.io.IOCallback; import org.apache.activemq.artemis.core.io.SequentialFile; import org.apache.activemq.artemis.core.journal.Journal; import org.apache.activemq.artemis.core.journal.JournalLoadInformation; -import org.apache.activemq.artemis.core.message.impl.MessageInternal; import org.apache.activemq.artemis.core.paging.PageTransactionInfo; import org.apache.activemq.artemis.core.paging.PagedMessage; import org.apache.activemq.artemis.core.paging.PagingManager; @@ -53,7 +53,6 @@ import org.apache.activemq.artemis.core.replication.ReplicationManager; import org.apache.activemq.artemis.core.server.LargeServerMessage; import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.core.server.RouteContextList; -import org.apache.activemq.artemis.core.server.ServerMessage; import org.apache.activemq.artemis.core.server.files.FileStoreMonitor; import org.apache.activemq.artemis.core.server.group.impl.GroupBinding; import org.apache.activemq.artemis.core.server.impl.AddressInfo; @@ -323,7 +322,7 @@ public class TransactionImplTest extends ActiveMQTestBase { } @Override - public void storeMessage(ServerMessage message) throws Exception { + public void storeMessage(Message message) throws Exception { } @@ -368,7 +367,7 @@ public class TransactionImplTest extends ActiveMQTestBase { } @Override - public void storeMessageTransactional(long txID, ServerMessage message) throws Exception { + public void storeMessageTransactional(long txID, Message message) throws Exception { } @@ -439,7 +438,7 @@ public class TransactionImplTest extends ActiveMQTestBase { } @Override - public LargeServerMessage createLargeMessage(long id, MessageInternal message) throws Exception { + public LargeServerMessage createLargeMessage(long id, Message message) throws Exception { return null; } @@ -489,11 +488,6 @@ public class TransactionImplTest extends ActiveMQTestBase { } @Override - public void updatePageTransaction(PageTransactionInfo pageTransaction, int depage) throws Exception { - - } - - @Override public void deletePageTransactional(long recordID) throws Exception { } @@ -643,7 +637,7 @@ public class TransactionImplTest extends ActiveMQTestBase { @Override public boolean addToPage(PagingStore store, - ServerMessage msg, + Message msg, Transaction tx, RouteContextList listCtx) throws Exception { return false; http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/46be6a2f/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java index 2f12b05..aa64d9f 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java @@ -94,6 +94,7 @@ import org.apache.activemq.artemis.core.journal.RecordInfo; import org.apache.activemq.artemis.core.journal.impl.JournalFile; import org.apache.activemq.artemis.core.journal.impl.JournalImpl; import org.apache.activemq.artemis.core.journal.impl.JournalReaderCallback; +import org.apache.activemq.artemis.core.message.impl.CoreMessage; import org.apache.activemq.artemis.core.paging.PagingStore; import org.apache.activemq.artemis.core.persistence.impl.journal.OperationContextImpl; import org.apache.activemq.artemis.core.postoffice.Binding; @@ -117,14 +118,12 @@ import org.apache.activemq.artemis.core.server.JournalType; import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.core.server.NodeManager; import org.apache.activemq.artemis.core.server.Queue; -import org.apache.activemq.artemis.core.server.ServerMessage; import org.apache.activemq.artemis.core.server.cluster.ClusterConnection; import org.apache.activemq.artemis.core.server.cluster.ClusterManager; import org.apache.activemq.artemis.core.server.cluster.RemoteQueueBinding; import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType; import org.apache.activemq.artemis.core.server.impl.Activation; import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl; -import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl; import org.apache.activemq.artemis.core.server.impl.SharedNothingBackupActivation; import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; @@ -855,7 +854,7 @@ public abstract class ActiveMQTestBase extends Assert { return testDir1 + "/journal"; } - protected String getJournalDir(final int index, final boolean backup) { + public String getJournalDir(final int index, final boolean backup) { return getJournalDir(getTestDir(), index, backup); } @@ -2079,8 +2078,8 @@ public abstract class ActiveMQTestBase extends Assert { } } - protected ServerMessage generateMessage(final long id) { - ServerMessage message = new ServerMessageImpl(id, 1000); + protected Message generateMessage(final long id) { + Message message = new CoreMessage(id, 1000); message.setMessageID(id); @@ -2092,9 +2091,9 @@ public abstract class ActiveMQTestBase extends Assert { } protected MessageReference generateReference(final Queue queue, final long id) { - ServerMessage message = generateMessage(id); + Message message = generateMessage(id); - return message.createReference(queue); + return MessageReference.Factory.createReference(message, queue); } protected int calculateRecordSize(final int size, final int alignment) { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/46be6a2f/artemis-server/src/test/resources/ConfigurationTest-full-config.xml ---------------------------------------------------------------------- diff --git a/artemis-server/src/test/resources/ConfigurationTest-full-config.xml b/artemis-server/src/test/resources/ConfigurationTest-full-config.xml index 9ed5584..0691e95 100644 --- a/artemis-server/src/test/resources/ConfigurationTest-full-config.xml +++ b/artemis-server/src/test/resources/ConfigurationTest-full-config.xml @@ -231,8 +231,6 @@ 33 123 56546 - 5 - true 5000 95 54321 http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/46be6a2f/artemis-tools/src/test/resources/artemis-configuration.xsd ---------------------------------------------------------------------- diff --git a/artemis-tools/src/test/resources/artemis-configuration.xsd b/artemis-tools/src/test/resources/artemis-configuration.xsd index fcb7a20..090f968 100644 --- a/artemis-tools/src/test/resources/artemis-configuration.xsd +++ b/artemis-tools/src/test/resources/artemis-configuration.xsd @@ -673,22 +673,6 @@ - - - - XXX Only meant to be used by project developers - - - - - - - - XXX Only meant to be used by project developers - - - - http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/46be6a2f/examples/features/standard/bridge/src/main/java/org/apache/activemq/artemis/jms/example/HatColourChangeTransformer.java ---------------------------------------------------------------------- diff --git a/examples/features/standard/bridge/src/main/java/org/apache/activemq/artemis/jms/example/HatColourChangeTransformer.java b/examples/features/standard/bridge/src/main/java/org/apache/activemq/artemis/jms/example/HatColourChangeTransformer.java index d7a7bdc..91fe808 100644 --- a/examples/features/standard/bridge/src/main/java/org/apache/activemq/artemis/jms/example/HatColourChangeTransformer.java +++ b/examples/features/standard/bridge/src/main/java/org/apache/activemq/artemis/jms/example/HatColourChangeTransformer.java @@ -17,7 +17,7 @@ package org.apache.activemq.artemis.jms.example; import org.apache.activemq.artemis.api.core.SimpleString; -import org.apache.activemq.artemis.core.server.ServerMessage; + import org.apache.activemq.artemis.core.server.cluster.Transformer; public class HatColourChangeTransformer implements Transformer { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/46be6a2f/examples/features/standard/divert/src/main/java/org/apache/activemq/artemis/jms/example/AddForwardingTimeTransformer.java ---------------------------------------------------------------------- diff --git a/examples/features/standard/divert/src/main/java/org/apache/activemq/artemis/jms/example/AddForwardingTimeTransformer.java b/examples/features/standard/divert/src/main/java/org/apache/activemq/artemis/jms/example/AddForwardingTimeTransformer.java index 22272d0..32035e7 100644 --- a/examples/features/standard/divert/src/main/java/org/apache/activemq/artemis/jms/example/AddForwardingTimeTransformer.java +++ b/examples/features/standard/divert/src/main/java/org/apache/activemq/artemis/jms/example/AddForwardingTimeTransformer.java @@ -17,7 +17,7 @@ package org.apache.activemq.artemis.jms.example; import org.apache.activemq.artemis.api.core.SimpleString; -import org.apache.activemq.artemis.core.server.ServerMessage; + import org.apache.activemq.artemis.core.server.cluster.Transformer; public class AddForwardingTimeTransformer implements Transformer { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/46be6a2f/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 5b056ad..7388068 100644 --- a/pom.xml +++ b/pom.xml @@ -83,7 +83,7 @@ 3.6.9.Final 2.4 4.1.5.Final - 0.16.0 + 0.17.0 3.0.19.Final 1.7.21 0.11.0 @@ -1006,6 +1006,7 @@ -Xep:StaticAccessedFromInstance:ERROR -Xep:SynchronizeOnNonFinalField:ERROR -Xep:WaitNotInLoop:ERROR + -Xdiags:verbose http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/46be6a2f/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableDelivery.java ---------------------------------------------------------------------- diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableDelivery.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableDelivery.java index dea8602..d7d7f9d 100644 --- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableDelivery.java +++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableDelivery.java @@ -52,6 +52,11 @@ public class UnmodifiableDelivery implements Delivery { } @Override + public int getDataLength() { + return delivery.getDataLength(); + } + + @Override public DeliveryState getLocalState() { return delivery.getLocalState(); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/46be6a2f/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/benchmarks/journal/gcfree/EncodersBench.java ---------------------------------------------------------------------- diff --git a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/benchmarks/journal/gcfree/EncodersBench.java b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/benchmarks/journal/gcfree/EncodersBench.java index 833302d..162a512 100644 --- a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/benchmarks/journal/gcfree/EncodersBench.java +++ b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/benchmarks/journal/gcfree/EncodersBench.java @@ -23,6 +23,7 @@ import java.util.concurrent.TimeUnit; import io.netty.buffer.Unpooled; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper; +import org.apache.activemq.artemis.core.journal.EncoderPersister; import org.apache.activemq.artemis.core.journal.EncodingSupport; import org.apache.activemq.artemis.core.journal.impl.dataformat.JournalAddRecord; import org.apache.activemq.artemis.core.journal.impl.dataformat.JournalInternalRecord; @@ -61,7 +62,7 @@ public class EncodersBench { this.byteBuffer.order(ByteOrder.nativeOrder()); this.addJournalRecordEncoder = new AddJournalRecordEncoder(); - this.record = new JournalAddRecord(true, 1, (byte) 1, ZeroEncodingSupport.Instance); + this.record = new JournalAddRecord(true, 1, (byte) 1, EncoderPersister.getInstance(), ZeroEncodingSupport.Instance); this.record.setFileID(1); this.record.setCompactCount((short) 1); this.outBuffer = new ChannelBufferWrapper(Unpooled.directBuffer(this.record.getEncodeSize(), this.record.getEncodeSize()).order(ByteOrder.nativeOrder())); @@ -86,7 +87,7 @@ public class EncodersBench { @Benchmark public int encodeUnalignedWithGarbage() { outBuffer.clear(); - final JournalAddRecord addRecord = new JournalAddRecord(true, 1, (byte) 1, ZeroEncodingSupport.Instance); + final JournalAddRecord addRecord = new JournalAddRecord(true, 1, (byte) 1, EncoderPersister.getInstance(), ZeroEncodingSupport.Instance); addRecord.setFileID(1); addRecord.setCompactCount((short) 1); addRecord.encode(outBuffer); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/46be6a2f/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/JMSBridgeReconnectionTest.java ---------------------------------------------------------------------- diff --git a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/JMSBridgeReconnectionTest.java b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/JMSBridgeReconnectionTest.java index ef71e89..03e2ddc 100644 --- a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/JMSBridgeReconnectionTest.java +++ b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/JMSBridgeReconnectionTest.java @@ -19,8 +19,8 @@ package org.apache.activemq.artemis.tests.extras.byteman; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.core.client.impl.ClientProducerCredits; -import org.apache.activemq.artemis.core.message.impl.MessageInternal; import org.apache.activemq.artemis.core.protocol.core.Packet; import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendMessage; @@ -111,7 +111,7 @@ public class JMSBridgeReconnectionTest extends BridgeTestBase { static int count = 20; static CountDownLatch stopLatch = new CountDownLatch(1); - public static void pause2(MessageInternal msgI, boolean sendBlocking, final ClientProducerCredits theCredits) { + public static void pause2(Message msgI, boolean sendBlocking, final ClientProducerCredits theCredits) { if (msgI.containsProperty("__AMQ_CID")) { count--; } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/46be6a2f/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/MessageCopyTest.java ---------------------------------------------------------------------- diff --git a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/MessageCopyTest.java b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/MessageCopyTest.java deleted file mode 100644 index 1ff58cd..0000000 --- a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/MessageCopyTest.java +++ /dev/null @@ -1,163 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.artemis.tests.extras.byteman; - -import java.util.ArrayList; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.atomic.AtomicInteger; - -import org.apache.activemq.artemis.api.core.ActiveMQBuffer; -import org.apache.activemq.artemis.api.core.SimpleString; -import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendMessage; -import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl; -import org.apache.activemq.artemis.utils.RandomUtil; -import org.jboss.byteman.contrib.bmunit.BMRule; -import org.jboss.byteman.contrib.bmunit.BMRules; -import org.jboss.byteman.contrib.bmunit.BMUnitRunner; -import org.junit.Assert; -import org.junit.Test; -import org.junit.runner.RunWith; - -@RunWith(BMUnitRunner.class) -public class MessageCopyTest { - - @Test - @BMRules( - - rules = {@BMRule( - name = "message-copy0", - targetClass = "org.apache.activemq.artemis.core.server.impl.ServerMessageImpl", - targetMethod = "copy()", - targetLocation = "ENTRY", - action = "System.out.println(\"copy\"), waitFor(\"encode-done\")"), @BMRule( - name = "message-copy-done", - targetClass = "org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendMessage", - targetMethod = "encode(org.apache.activemq.artemis.spi.core.protocol.RemotingConnection)", - targetLocation = "EXIT", - action = "System.out.println(\"encodeDone\"), signalWake(\"encode-done\", true)"), @BMRule( - name = "message-copy1", - targetClass = "org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper", - targetMethod = "copy(int, int)", - condition = "Thread.currentThread().getName().equals(\"T1\")", - targetLocation = "EXIT", - action = "System.out.println(\"setIndex at \" + Thread.currentThread().getName()), waitFor(\"finish-read\")"), @BMRule( - name = "JMSServer.stop wait-init", - targetClass = "org.apache.activemq.artemis.tests.extras.byteman.MessageCopyTest", - targetMethod = "simulateRead", - targetLocation = "EXIT", - action = "signalWake(\"finish-read\", true)")}) - public void testMessageCopyIssue() throws Exception { - final long RUNS = 1; - final ServerMessageImpl msg = new ServerMessageImpl(123, 18); - - msg.setMessageID(RandomUtil.randomLong()); - msg.encodeMessageIDToBuffer(); - msg.setAddress(new SimpleString("Batatantkashf aksjfh aksfjh askfdjh askjfh ")); - - final AtomicInteger errors = new AtomicInteger(0); - - int T1_number = 1; - int T2_number = 1; - - final CountDownLatch latchAlign = new CountDownLatch(T1_number + T2_number); - final CountDownLatch latchReady = new CountDownLatch(1); - class T1 extends Thread { - - T1() { - super("T1"); - } - - @Override - public void run() { - latchAlign.countDown(); - try { - latchReady.await(); - } catch (Exception ignored) { - } - - for (int i = 0; i < RUNS; i++) { - try { - ServerMessageImpl newMsg = (ServerMessageImpl) msg.copy(); - } catch (Throwable e) { - e.printStackTrace(); - errors.incrementAndGet(); - } - } - } - } - - class T2 extends Thread { - - T2() { - super("T2"); - } - - @Override - public void run() { - latchAlign.countDown(); - try { - latchReady.await(); - } catch (Exception ignored) { - } - - for (int i = 0; i < RUNS; i++) { - try { - SessionSendMessage ssm = new SessionSendMessage(msg); - ActiveMQBuffer buf = ssm.encode(null); - System.out.println("reading at buf = " + buf); - simulateRead(buf); - } catch (Throwable e) { - e.printStackTrace(); - errors.incrementAndGet(); - } - } - } - } - - ArrayList threads = new ArrayList<>(); - - for (int i = 0; i < T1_number; i++) { - T1 t = new T1(); - threads.add(t); - t.start(); - } - - for (int i = 0; i < T2_number; i++) { - T2 t2 = new T2(); - threads.add(t2); - t2.start(); - } - - latchAlign.await(); - - latchReady.countDown(); - - for (Thread t : threads) { - t.join(); - } - - Assert.assertEquals(0, errors.get()); - } - - private void simulateRead(ActiveMQBuffer buf) { - buf.setIndex(buf.capacity() / 2, buf.capacity() / 2); - - // ok this is not actually happening during the read process, but changing this shouldn't affect the buffer on copy - buf.writeBytes(new byte[1024]); - } - -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/46be6a2f/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/DuplicateDetectionTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/DuplicateDetectionTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/DuplicateDetectionTest.java index 8456765..0860e97 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/DuplicateDetectionTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/DuplicateDetectionTest.java @@ -33,7 +33,7 @@ import org.apache.activemq.artemis.api.core.client.ClientSession; import org.apache.activemq.artemis.api.core.client.ClientSessionFactory; import org.apache.activemq.artemis.api.core.client.ServerLocator; import org.apache.activemq.artemis.core.config.Configuration; -import org.apache.activemq.artemis.core.message.impl.MessageImpl; + import org.apache.activemq.artemis.core.postoffice.impl.PostOfficeImpl; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.transaction.impl.XidImpl; @@ -191,13 +191,13 @@ public class DuplicateDetectionTest extends ActiveMQTestBase { Assert.assertNull(message2); message = createMessage(session, 3); - message.putBytesProperty(MessageImpl.HDR_BRIDGE_DUPLICATE_ID, dupID.getData()); + message.putBytesProperty(Message.HDR_BRIDGE_DUPLICATE_ID, dupID.getData()); producer.send(message); message2 = consumer.receive(1000); Assert.assertEquals(3, message2.getObjectProperty(propKey)); message = createMessage(session, 4); - message.putBytesProperty(MessageImpl.HDR_BRIDGE_DUPLICATE_ID, dupID.getData()); + message.putBytesProperty(Message.HDR_BRIDGE_DUPLICATE_ID, dupID.getData()); producer.send(message); message2 = consumer.receiveImmediate(); Assert.assertNull(message2); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/46be6a2f/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTest.java index 7962005..aa1bdc4 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTest.java @@ -224,6 +224,7 @@ public class ProtonTest extends ProtonTestBase { TextMessage message = session.createTextMessage("test-message"); producer.send(message); + producer.close(); connection.start(); @@ -827,12 +828,7 @@ public class ProtonTest extends ProtonTestBase { AmqpReceiver receiver = session.createReceiver(coreAddress); server.destroyQueue(new SimpleString(coreAddress), null, false, true); - Wait.waitFor(new Wait.Condition() { - @Override - public boolean isSatisfied() throws Exception { - return receiver.isClosed(); - } - }); + Wait.waitFor(receiver::isClosed); assertTrue(receiver.isClosed()); } finally { amqpConnection.close(); @@ -851,12 +847,7 @@ public class ProtonTest extends ProtonTestBase { connection.disconnect(true); } - Wait.waitFor(new Wait.Condition() { - @Override - public boolean isSatisfied() throws Exception { - return amqpConnection.isClosed(); - } - }); + Wait.waitFor(amqpConnection::isClosed); assertTrue(amqpConnection.isClosed()); assertEquals(AmqpSupport.CONNECTION_FORCED, amqpConnection.getConnection().getRemoteCondition().getCondition()); @@ -1001,12 +992,7 @@ public class ProtonTest extends ProtonTestBase { final ActiveMQServer remote = createAMQPServer(5673); remote.start(); try { - Wait.waitFor(new Wait.Condition() { - @Override - public boolean isSatisfied() throws Exception { - return remote.isActive(); - } - }); + Wait.waitFor(remote::isActive); } catch (Exception e) { remote.stop(); throw e; http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/46be6a2f/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AckBatchSizeTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AckBatchSizeTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AckBatchSizeTest.java index beac414..847b69e 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AckBatchSizeTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AckBatchSizeTest.java @@ -63,7 +63,8 @@ public class AckBatchSizeTest extends ActiveMQTestBase { ActiveMQServer server = createServer(false); server.start(); int numMessages = 100; - ServerLocator locator = createInVMNonHALocator().setAckBatchSize(numMessages * getMessageEncodeSize(addressA)).setBlockOnAcknowledge(true); + int originalSize = getMessageEncodeSize(addressA); + ServerLocator locator = createInVMNonHALocator().setAckBatchSize(numMessages * originalSize).setBlockOnAcknowledge(true); ClientSessionFactory cf = createSessionFactory(locator); ClientSession sendSession = cf.createSession(false, true, true); @@ -71,20 +72,25 @@ public class AckBatchSizeTest extends ActiveMQTestBase { session.createQueue(addressA, queueA, false); ClientProducer cp = sendSession.createProducer(addressA); for (int i = 0; i < numMessages; i++) { - cp.send(sendSession.createMessage(false)); + ClientMessage message = (ClientMessage)sendSession.createMessage(false).setAddress(addressA); + Assert.assertEquals(originalSize, message.getEncodeSize()); + cp.send(message); + Assert.assertEquals(originalSize, message.getEncodeSize()); } ClientConsumer consumer = session.createConsumer(queueA); session.start(); for (int i = 0; i < numMessages - 1; i++) { + System.out.println("Receive "); ClientMessage m = consumer.receive(5000); - + Assert.assertEquals(0, m.getPropertyNames().size()); + Assert.assertEquals("expected to be " + originalSize, originalSize, m.getEncodeSize()); m.acknowledge(); } ClientMessage m = consumer.receive(5000); Queue q = (Queue) server.getPostOffice().getBinding(queueA).getBindable(); - Assert.assertEquals(100, q.getDeliveringCount()); + Assert.assertEquals(numMessages, q.getDeliveringCount()); m.acknowledge(); Assert.assertEquals(0, q.getDeliveringCount()); sendSession.close(); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/46be6a2f/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AcknowledgeTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AcknowledgeTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AcknowledgeTest.java index 0597dd5..442d6e9 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AcknowledgeTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AcknowledgeTest.java @@ -21,10 +21,12 @@ import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import io.netty.buffer.ByteBuf; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQPropertyConversionException; import org.apache.activemq.artemis.api.core.Message; +import org.apache.activemq.artemis.api.core.RefCountMessage; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.client.ClientConsumer; import org.apache.activemq.artemis.api.core.client.ClientMessage; @@ -33,7 +35,10 @@ import org.apache.activemq.artemis.api.core.client.ClientSession; import org.apache.activemq.artemis.api.core.client.ClientSessionFactory; import org.apache.activemq.artemis.api.core.client.MessageHandler; import org.apache.activemq.artemis.api.core.client.ServerLocator; +import org.apache.activemq.artemis.api.core.encode.BodyType; import org.apache.activemq.artemis.core.client.impl.ClientSessionInternal; +import org.apache.activemq.artemis.core.message.LargeBodyEncoder; +import org.apache.activemq.artemis.core.persistence.Persister; import org.apache.activemq.artemis.core.protocol.core.impl.ActiveMQConsumerContext; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.Queue; @@ -332,10 +337,124 @@ public class AcknowledgeTest extends ActiveMQTestBase { } } - class FakeMessageWithID implements Message { + class FakeMessageWithID extends RefCountMessage { + + @Override + public int getPersistSize() { + return 0; + } + + @Override + public void persist(ActiveMQBuffer targetRecord) { + } + + @Override + public Persister getPersister() { + return null; + } + + @Override + public Message setProtocol(Object protocol) { + return this; + } + + @Override + public void reloadPersistence(ActiveMQBuffer record) { + + } + + @Override + public Message toCore() { + return this; + } + + @Override + public void receiveBuffer(ByteBuf buffer) { + + } + + @Override + public void sendBuffer(ByteBuf buffer, int count) { + + } + + @Override + public LargeBodyEncoder getBodyEncoder() throws ActiveMQException { + return null; + } + + @Override + public Message setUserID(Object userID) { + return null; + } + + @Override + public void copyHeadersAndProperties(Message msg) { + + } + + @Override + public void messageChanged() { + + } + + @Override + public ActiveMQBuffer getReadOnlyBodyBuffer() { + return null; + } final long id; + @Override + public Message setType(byte type) { + return null; + } + + @Override + public Message copy() { + return null; + } + + @Override + public Message copy(long newID) { + return null; + } + + @Override + public Message setMessageID(long id) { + return null; + } + + @Override + public int getRefCount() { + return 0; + } + + @Override + public int incrementRefCount() throws Exception { + return 0; + } + + @Override + public int decrementRefCount() throws Exception { + return 0; + } + + @Override + public int incrementDurableRefCount() { + return 0; + } + + @Override + public int decrementDurableRefCount() { + return 0; + } + + @Override + public int getMemoryEstimate() { + return 0; + } + FakeMessageWithID(final long id) { this.id = id; } @@ -351,12 +470,47 @@ public class AcknowledgeTest extends ActiveMQTestBase { } @Override - public FakeMessageWithID setUserID(UUID userID) { - return this; + public String getAddress() { + return null; + } + + @Override + public SimpleString getAddressSimpleString() { + return null; + } + + @Override + public Message setBuffer(ByteBuf buffer) { + return null; + } + + @Override + public ByteBuf getBuffer() { + return null; + } + + @Override + public Object getProtocol() { + return null; + } + + @Override + public Object getBody() { + return null; + } + + @Override + public BodyType getBodyType() { + return null; } @Override - public SimpleString getAddress() { + public Message setBody(BodyType type, Object body) { + return null; + } + + @Override + public Message setAddress(String address) { return null; } @@ -431,11 +585,6 @@ public class AcknowledgeTest extends ActiveMQTestBase { } @Override - public ActiveMQBuffer getBodyBufferDuplicate() { - return null; - } - - @Override public Message putBooleanProperty(SimpleString key, boolean value) { return null; } @@ -689,15 +838,5 @@ public class AcknowledgeTest extends ActiveMQTestBase { public Map toPropertyMap() { return null; } - - @Override - public FakeMessageWithID writeBodyBufferBytes(byte[] bytes) { - return this; - } - - @Override - public FakeMessageWithID writeBodyBufferString(String string) { - return this; - } } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/46be6a2f/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerTest.java index 8f00b2a..e2cf2a0 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerTest.java @@ -16,6 +16,13 @@ */ package org.apache.activemq.artemis.tests.integration.client; +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.DeliveryMode; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; import java.util.Arrays; import java.util.Collection; import java.util.Set; @@ -27,6 +34,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQIllegalStateException; import org.apache.activemq.artemis.api.core.Interceptor; import org.apache.activemq.artemis.api.core.Message; +import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.client.ClientConsumer; import org.apache.activemq.artemis.api.core.client.ClientMessage; @@ -41,10 +49,13 @@ import org.apache.activemq.artemis.core.protocol.core.Packet; import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.Queue; +import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; import org.apache.activemq.artemis.tests.util.Wait; +import org.apache.activemq.artemis.utils.ByteUtil; import org.apache.activemq.artemis.utils.ConcurrentHashSet; +import org.apache.qpid.jms.JmsConnectionFactory; import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -54,15 +65,17 @@ import org.junit.runners.Parameterized; @RunWith(value = Parameterized.class) public class ConsumerTest extends ActiveMQTestBase { - @Parameterized.Parameters(name = "isNetty={0}") + @Parameterized.Parameters(name = "isNetty={0}, persistent={1}") public static Collection getParameters() { - return Arrays.asList(new Object[][]{{true}, {false}}); + return Arrays.asList(new Object[][]{{true, true}, {false, false}, {false, true}, {true, false}}); } - public ConsumerTest(boolean netty) { + public ConsumerTest(boolean netty, boolean durable) { this.netty = netty; + this.durable = durable; } + private final boolean durable; private final boolean netty; private ActiveMQServer server; @@ -79,13 +92,31 @@ public class ConsumerTest extends ActiveMQTestBase { public void setUp() throws Exception { super.setUp(); - server = createServer(false, isNetty()); + server = createServer(durable, isNetty()); server.start(); locator = createFactory(isNetty()); } + @Before + public void createQueue() throws Exception { + + ServerLocator locator = createFactory(isNetty()); + + ClientSessionFactory sf = createSessionFactory(locator); + + ClientSession session = sf.createSession(false, true, true, true); + + server.createQueue(QUEUE, RoutingType.ANYCAST, QUEUE, null, true, false); + + session.close(); + + sf.close(); + + locator.close(); + } + @Test public void testStressConnection() throws Exception { @@ -113,34 +144,123 @@ public class ConsumerTest extends ActiveMQTestBase { ClientSession session = sf.createSession(false, true, true, false); - session.createQueue(QUEUE, QUEUE, null, false); - - ClientConsumer consumer = session.createConsumer(QUEUE); - ClientProducer producer = session.createProducer(QUEUE); ClientMessage message = session.createMessage(Message.TEXT_TYPE, true, 0, System.currentTimeMillis(), (byte) 4); message.getBodyBuffer().writeString("hi"); message.putStringProperty("hello", "elo"); producer.send(message); + session.commit(); + + session.close(); + if (durable) { + server.stop(); + server.start(); + } + sf = createSessionFactory(locator); + session = sf.createSession(false, true, true, false); + ClientConsumer consumer = session.createConsumer(QUEUE); + session.start(); if (cancelOnce) { - final ClientConsumerInternal consumerInternal = (ClientConsumerInternal)consumer; + final ClientConsumerInternal consumerInternal = (ClientConsumerInternal) consumer; Wait.waitFor(() -> consumerInternal.getBufferSize() > 0); consumer.close(); consumer = session.createConsumer(QUEUE); } ClientMessage message2 = consumer.receive(1000); + Assert.assertNotNull(message2); + System.out.println("Id::" + message2.getMessageID()); System.out.println("Received " + message2); + System.out.println("Clie:" + ByteUtil.bytesToHex(message2.getBuffer().array(), 4)); + + System.out.println("String::" + message2.getReadOnlyBodyBuffer().readString()); + + Assert.assertEquals("elo", message2.getStringProperty("hello")); + + Assert.assertEquals("hi", message2.getReadOnlyBodyBuffer().readString()); + session.close(); } + @Test + public void testSendReceiveAMQP() throws Throwable { + + if (!isNetty()) { + // no need to run the test, there's no AMQP support + return; + } + + internalSend(true); + } + + @Test + public void testSendReceiveCore() throws Throwable { + + if (!isNetty()) { + // no need to run the test, there's no AMQP support + return; + } + + internalSend(false); + } + + public void internalSend(boolean amqp) throws Throwable { + + ConnectionFactory factory; + + if (amqp) { + factory = new JmsConnectionFactory("amqp://localhost:61616"); + } else { + factory = new ActiveMQConnectionFactory(); + } + + + Connection connection = factory.createConnection(); + + try { + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + javax.jms.Queue queue = session.createQueue(QUEUE.toString()); + MessageProducer producer = session.createProducer(queue); + producer.setDeliveryMode(DeliveryMode.PERSISTENT); + + long time = System.currentTimeMillis(); + int NUMBER_OF_MESSAGES = 100; + for (int i = 0; i < NUMBER_OF_MESSAGES; i++) { + producer.send(session.createTextMessage("hello " + i)); + } + long end = System.currentTimeMillis(); + + System.out.println("Time = " + (end - time)); + + connection.close(); + + if (this.durable) { + server.stop(); + server.start(); + } + connection = factory.createConnection(); + session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + connection.start(); + + MessageConsumer consumer = session.createConsumer(queue); + + for (int i = 0; i < NUMBER_OF_MESSAGES; i++) { + TextMessage message = (TextMessage) consumer.receive(1000); + Assert.assertNotNull(message); + Assert.assertEquals("hello " + i, message.getText()); + } + } finally { + connection.close(); + } + } @Test public void testConsumerAckImmediateAutoCommitTrue() throws Exception { @@ -148,8 +268,6 @@ public class ConsumerTest extends ActiveMQTestBase { ClientSession session = sf.createSession(false, true, true, true); - session.createQueue(QUEUE, QUEUE, null, false); - ClientProducer producer = session.createProducer(QUEUE); final int numMessages = 100; @@ -180,8 +298,6 @@ public class ConsumerTest extends ActiveMQTestBase { ClientSession session = sf.createSession(false, true, false, true); - session.createQueue(QUEUE, QUEUE, null, false); - ClientProducer producer = session.createProducer(QUEUE); final int numMessages = 100; @@ -212,8 +328,6 @@ public class ConsumerTest extends ActiveMQTestBase { ClientSession session = sf.createSession(false, true, true, true); - session.createQueue(QUEUE, QUEUE, null, false); - ClientProducer producer = session.createProducer(QUEUE); final int numMessages = 100; @@ -247,8 +361,6 @@ public class ConsumerTest extends ActiveMQTestBase { ClientSession session = sf.createSession(false, true, true, true); - session.createQueue(QUEUE, QUEUE, null, false); - ClientProducer producer = session.createProducer(QUEUE); final int numMessages = 100; @@ -284,11 +396,9 @@ public class ConsumerTest extends ActiveMQTestBase { ClientSession session = sf.createSession(false, true, true); - session.createQueue(QUEUE, QUEUE, null, false); - ClientProducer producer = session.createProducer(QUEUE); - final int numMessages = 10000; + final int numMessages = 100; for (int i = 0; i < numMessages; i++) { ClientMessage message = createTextMessage(session, "m" + i); @@ -338,8 +448,6 @@ public class ConsumerTest extends ActiveMQTestBase { ClientSession session = sf.createSession(false, true, true); - session.createQueue(QUEUE, QUEUE, null, false); - session.start(); ClientProducer producer = session.createProducer(QUEUE); @@ -372,8 +480,6 @@ public class ConsumerTest extends ActiveMQTestBase { ClientSession session = sf.createSession(false, true, true); session.start(); - session.createQueue(QUEUE, QUEUE, null, false); - ClientConsumer consumer = session.createConsumer(QUEUE); consumer.setMessageHandler(new MessageHandler() { @@ -394,8 +500,6 @@ public class ConsumerTest extends ActiveMQTestBase { ClientSession session = sf.createSession(false, true, true); - session.createQueue(QUEUE, QUEUE, null, false); - ClientConsumer consumer = session.createConsumer(QUEUE); consumer.setMessageHandler(new MessageHandler() { @@ -436,7 +540,7 @@ public class ConsumerTest extends ActiveMQTestBase { sessions.add(session); - session.createQueue(QUEUE, QUEUE.concat("" + i), null, false); + session.createQueue(QUEUE, QUEUE.concat("" + i), null, true); if (i == 0) { session.createQueue(QUEUE_RESPONSE, QUEUE_RESPONSE); @@ -550,8 +654,6 @@ public class ConsumerTest extends ActiveMQTestBase { ClientSession session = sf.createTransactedSession(); - session.createQueue(QUEUE, QUEUE, null, false); - ClientProducer producer = session.createProducer(QUEUE); final int numMessages = 100; @@ -598,7 +700,6 @@ public class ConsumerTest extends ActiveMQTestBase { ServerLocator locator = addServerLocator(ServerLocatorImpl.newLocator("vm:/1")); ClientSessionFactory factory = locator.createSessionFactory(); ClientSession session = factory.createSession(); - session.createQueue(QUEUE, QUEUE); ClientProducer producer = session.createProducer(QUEUE); producer.send(session.createMessage(true)); @@ -620,8 +721,6 @@ public class ConsumerTest extends ActiveMQTestBase { ClientSession session = sf.createTransactedSession(); - session.createQueue(QUEUE, QUEUE, null, false); - ClientProducer producer = session.createProducer(QUEUE); final int numMessages = 100; http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/46be6a2f/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java index 81e0ca4..201a96b 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java @@ -28,6 +28,8 @@ import java.util.concurrent.TimeUnit; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.Interceptor; + +import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.client.ClientConsumer; import org.apache.activemq.artemis.api.core.client.ClientMessage; @@ -56,7 +58,6 @@ import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.QueueConfig; import org.apache.activemq.artemis.core.server.ServerConsumer; -import org.apache.activemq.artemis.core.server.ServerMessage; import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl; import org.apache.activemq.artemis.core.server.impl.QueueFactoryImpl; import org.apache.activemq.artemis.core.server.impl.QueueImpl; @@ -519,7 +520,7 @@ public class HangConsumerTest extends ActiveMQTestBase { * @see SessionCallback#sendJmsMessage(org.apache.activemq.artemis.core.server.ServerMessage, long, int) */ @Override - public int sendMessage(MessageReference ref, ServerMessage message, ServerConsumer consumer, int deliveryCount) { + public int sendMessage(MessageReference ref, Message message, ServerConsumer consumer, int deliveryCount) { inCall.countDown(); try { callbackSemaphore.acquire(); @@ -541,7 +542,7 @@ public class HangConsumerTest extends ActiveMQTestBase { */ @Override public int sendLargeMessage(MessageReference reference, - ServerMessage message, + Message message, ServerConsumer consumer, long bodySize, int deliveryCount) { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/46be6a2f/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/InVMNonPersistentMessageBufferTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/InVMNonPersistentMessageBufferTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/InVMNonPersistentMessageBufferTest.java index 450a361..d35f436 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/InVMNonPersistentMessageBufferTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/InVMNonPersistentMessageBufferTest.java @@ -16,13 +16,13 @@ */ package org.apache.activemq.artemis.tests.integration.client; +import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.client.ClientConsumer; import org.apache.activemq.artemis.api.core.client.ClientMessage; import org.apache.activemq.artemis.api.core.client.ClientProducer; import org.apache.activemq.artemis.api.core.client.ClientSession; import org.apache.activemq.artemis.api.core.client.ClientSessionFactory; import org.apache.activemq.artemis.api.core.client.ServerLocator; -import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; @@ -130,9 +130,9 @@ public class InVMNonPersistentMessageBufferTest extends ActiveMQTestBase { message.getBodyBuffer().clear(); - Assert.assertEquals(PacketImpl.PACKET_HEADERS_SIZE + DataConstants.SIZE_INT, message.getBodyBuffer().writerIndex()); + Assert.assertEquals(DataConstants.SIZE_INT, message.getBodyBuffer().writerIndex()); - Assert.assertEquals(PacketImpl.PACKET_HEADERS_SIZE + DataConstants.SIZE_INT, message.getBodyBuffer().readerIndex()); + Assert.assertEquals(DataConstants.SIZE_INT, message.getBodyBuffer().readerIndex()); } } @@ -148,6 +148,18 @@ public class InVMNonPersistentMessageBufferTest extends ActiveMQTestBase { Assert.assertNotNull(received); + ActiveMQBuffer buffer = received.getReadOnlyBodyBuffer(); + + Assert.assertEquals(body, buffer.readString()); + + try { + buffer.readByte(); + Assert.fail("Should throw exception"); + } catch (IndexOutOfBoundsException e) { + // OK + } + + Assert.assertEquals(body, received.getBodyBuffer().readString()); try { @@ -157,6 +169,18 @@ public class InVMNonPersistentMessageBufferTest extends ActiveMQTestBase { } catch (IndexOutOfBoundsException e) { // OK } + + buffer = received.getReadOnlyBodyBuffer(); + + Assert.assertEquals(body, buffer.readString()); + + try { + buffer.readByte(); + Assert.fail("Should throw exception"); + } catch (IndexOutOfBoundsException e) { + // OK + } + } @Test @@ -167,7 +191,7 @@ public class InVMNonPersistentMessageBufferTest extends ActiveMQTestBase { message.getBodyBuffer().writeString(body); - Assert.assertEquals(PacketImpl.PACKET_HEADERS_SIZE + DataConstants.SIZE_INT, message.getBodyBuffer().readerIndex()); + Assert.assertEquals(DataConstants.SIZE_INT, message.getBodyBuffer().readerIndex()); String body2 = message.getBodyBuffer().readString(); @@ -175,7 +199,7 @@ public class InVMNonPersistentMessageBufferTest extends ActiveMQTestBase { message.getBodyBuffer().resetReaderIndex(); - Assert.assertEquals(PacketImpl.PACKET_HEADERS_SIZE + DataConstants.SIZE_INT, message.getBodyBuffer().readerIndex()); + Assert.assertEquals(DataConstants.SIZE_INT, message.getBodyBuffer().readerIndex()); String body3 = message.getBodyBuffer().readString(); @@ -189,7 +213,7 @@ public class InVMNonPersistentMessageBufferTest extends ActiveMQTestBase { received.getBodyBuffer().resetReaderIndex(); - Assert.assertEquals(PacketImpl.PACKET_HEADERS_SIZE + DataConstants.SIZE_INT, received.getBodyBuffer().readerIndex()); + Assert.assertEquals(DataConstants.SIZE_INT, received.getBodyBuffer().readerIndex()); String body4 = received.getBodyBuffer().readString(); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/46be6a2f/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/InterruptedLargeMessageTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/InterruptedLargeMessageTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/InterruptedLargeMessageTest.java index 1950e12..540baf6 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/InterruptedLargeMessageTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/InterruptedLargeMessageTest.java @@ -53,10 +53,8 @@ import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.QueueConfig; import org.apache.activemq.artemis.core.server.QueueFactory; import org.apache.activemq.artemis.api.core.RoutingType; -import org.apache.activemq.artemis.core.server.ServerSession; import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl; import org.apache.activemq.artemis.core.server.impl.QueueImpl; -import org.apache.activemq.artemis.core.server.impl.ServerSessionImpl; import org.apache.activemq.artemis.core.settings.HierarchicalRepository; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; @@ -125,10 +123,10 @@ public class InterruptedLargeMessageTest extends LargeMessageTestBase { producer.send(clientFile); Thread.sleep(500); - - for (ServerSession srvSession : server.getSessions()) { - ((ServerSessionImpl) srvSession).clearLargeMessage(); - } +// +// for (ServerSession srvSession : server.getSessions()) { +// ((ServerSessionImpl) srvSession).clearLargeMessage(); +// } server.stop(false); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/46be6a2f/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageTest.java index 3577a87..5e822eb 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageTest.java @@ -40,7 +40,7 @@ import org.apache.activemq.artemis.api.core.client.ServerLocator; import org.apache.activemq.artemis.core.client.impl.ClientConsumerInternal; import org.apache.activemq.artemis.core.config.Configuration; import org.apache.activemq.artemis.core.config.StoreConfiguration; -import org.apache.activemq.artemis.core.message.impl.MessageImpl; + import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager; import org.apache.activemq.artemis.core.persistence.impl.journal.LargeServerMessageImpl; import org.apache.activemq.artemis.core.server.ActiveMQServer; @@ -890,7 +890,7 @@ public class LargeMessageTest extends LargeMessageTestBase { Message clientFile = createLargeClientMessageStreaming(session, messageSize, true); if (isSimulateBridge) { - clientFile.putBytesProperty(MessageImpl.HDR_BRIDGE_DUPLICATE_ID, someDuplicateInfo.getBytes()); + clientFile.putBytesProperty(Message.HDR_BRIDGE_DUPLICATE_ID, someDuplicateInfo.getBytes()); } else { clientFile.putBytesProperty(Message.HDR_DUPLICATE_DETECTION_ID, someDuplicateInfo.getBytes()); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/46be6a2f/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/BridgeTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/BridgeTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/BridgeTest.java index 87f9255..b0f03d4 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/BridgeTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/BridgeTest.java @@ -38,6 +38,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.Interceptor; import org.apache.activemq.artemis.api.core.Message; + import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.TransportConfiguration; import org.apache.activemq.artemis.api.core.client.ActiveMQClient; @@ -68,7 +69,6 @@ import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.api.core.RoutingType; -import org.apache.activemq.artemis.core.server.ServerMessage; import org.apache.activemq.artemis.core.server.cluster.Bridge; import org.apache.activemq.artemis.core.server.cluster.Transformer; import org.apache.activemq.artemis.core.server.cluster.impl.BridgeImpl; @@ -1885,12 +1885,8 @@ public class BridgeTest extends ActiveMQTestBase { final String BRIDGE = "myBridge"; ServiceRegistryImpl serviceRegistry = new ServiceRegistryImpl(); - Transformer transformer = new Transformer() { - @Override - public ServerMessage transform(ServerMessage message) { - return null; - } - }; + Transformer transformer = (Message encode) -> null; + serviceRegistry.addBridgeTransformer(BRIDGE, transformer); Configuration config = createDefaultInVMConfig().addConnectorConfiguration("in-vm", new TransportConfiguration(INVM_CONNECTOR_FACTORY)); ActiveMQServer server = addServer(new ActiveMQServerImpl(config, null, null, null, serviceRegistry)); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/46be6a2f/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/SimpleTransformer.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/SimpleTransformer.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/SimpleTransformer.java index d9a817e..c0487d0 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/SimpleTransformer.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/SimpleTransformer.java @@ -16,40 +16,43 @@ */ package org.apache.activemq.artemis.tests.integration.cluster.bridge; -import org.apache.activemq.artemis.api.core.ActiveMQBuffer; -import org.apache.activemq.artemis.api.core.SimpleString; -import org.apache.activemq.artemis.core.server.ServerMessage; + +import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.core.server.cluster.Transformer; public class SimpleTransformer implements Transformer { @Override - public ServerMessage transform(final ServerMessage message) { - SimpleString oldProp = (SimpleString) message.getObjectProperty(new SimpleString("wibble")); - - if (!oldProp.equals(new SimpleString("bing"))) { - throw new IllegalStateException("Wrong property value!!"); - } - - // Change a property - message.putStringProperty(new SimpleString("wibble"), new SimpleString("bong")); - - // Change the body - ActiveMQBuffer buffer = message.getBodyBuffer(); - - buffer.readerIndex(0); - - String str = buffer.readString(); - - if (!str.equals("doo be doo be doo be doo")) { - throw new IllegalStateException("Wrong body!!"); - } - - buffer.clear(); - - buffer.writeString("dee be dee be dee be dee"); - - return message; + public Message transform(final Message message) { + + // TODO-now: fix this test!!! + + throw new RuntimeException(("Fix me")); +// SimpleString oldProp = (SimpleString) message.getObjectProperty(new SimpleString("wibble")); +// +// if (!oldProp.equals(new SimpleString("bing"))) { +// throw new IllegalStateException("Wrong property value!!"); +// } +// +// // Change a property +// message.putStringProperty(new SimpleString("wibble"), new SimpleString("bong")); +// +// // Change the body +// ActiveMQBuffer buffer = message.getBodyBuffer(); +// +// buffer.readerIndex(0); +// +// String str = buffer.readString(); +// +// if (!str.equals("doo be doo be doo be doo")) { +// throw new IllegalStateException("Wrong body!!"); +// } +// +// buffer.clear(); +// +// buffer.writeString("dee be dee be dee be dee"); +// +// return message; } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/46be6a2f/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusterHeadersRemovedTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusterHeadersRemovedTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusterHeadersRemovedTest.java index 26bcb43..8766057 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusterHeadersRemovedTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusterHeadersRemovedTest.java @@ -16,12 +16,13 @@ */ package org.apache.activemq.artemis.tests.integration.cluster.distribution; +import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.client.ClientConsumer; import org.apache.activemq.artemis.api.core.client.ClientMessage; import org.apache.activemq.artemis.api.core.client.ClientProducer; import org.apache.activemq.artemis.api.core.client.ClientSession; import org.apache.activemq.artemis.api.core.client.ClientSessionFactory; -import org.apache.activemq.artemis.core.message.impl.MessageImpl; + import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType; import org.junit.Before; import org.junit.Test; @@ -83,7 +84,7 @@ public class ClusterHeadersRemovedTest extends ClusterTestBase { assertNotNull(message); - assertFalse(message.containsProperty(MessageImpl.HDR_ROUTE_TO_IDS)); + assertFalse(message.containsProperty(Message.HDR_ROUTE_TO_IDS)); } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/46be6a2f/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/MessageRedistributionTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/MessageRedistributionTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/MessageRedistributionTest.java index de5fe33..0b0fa00 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/MessageRedistributionTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/MessageRedistributionTest.java @@ -28,7 +28,7 @@ import org.apache.activemq.artemis.api.core.client.ClientConsumer; import org.apache.activemq.artemis.api.core.client.ClientMessage; import org.apache.activemq.artemis.api.core.client.ClientProducer; import org.apache.activemq.artemis.api.core.client.ClientSession; -import org.apache.activemq.artemis.core.message.impl.MessageImpl; + import org.apache.activemq.artemis.core.server.Bindable; import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType; import org.apache.activemq.artemis.core.server.cluster.impl.Redistributor; @@ -474,7 +474,7 @@ public class MessageRedistributionTest extends ClusterTestBase { bb.putLong(i); - msg.putBytesProperty(MessageImpl.HDR_BRIDGE_DUPLICATE_ID, bytes); + msg.putBytesProperty(Message.HDR_BRIDGE_DUPLICATE_ID, bytes); prod0.send(msg); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/46be6a2f/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/divert/DivertTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/divert/DivertTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/divert/DivertTest.java index 510fa68..69a360e 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/divert/DivertTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/divert/DivertTest.java @@ -20,6 +20,7 @@ import java.util.Collection; import java.util.concurrent.TimeUnit; import org.apache.activemq.artemis.api.core.Message; + import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.client.ClientConsumer; import org.apache.activemq.artemis.api.core.client.ClientMessage; @@ -35,7 +36,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServers; import org.apache.activemq.artemis.core.server.Divert; import org.apache.activemq.artemis.api.core.RoutingType; -import org.apache.activemq.artemis.core.server.ServerMessage; + import org.apache.activemq.artemis.core.server.cluster.Transformer; import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl; import org.apache.activemq.artemis.core.server.impl.ServiceRegistryImpl; @@ -1301,7 +1302,7 @@ public class DivertTest extends ActiveMQTestBase { ServiceRegistryImpl serviceRegistry = new ServiceRegistryImpl(); Transformer transformer = new Transformer() { @Override - public ServerMessage transform(ServerMessage message) { + public Message transform(Message message) { return null; } }; http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/46be6a2f/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/interceptors/InterceptorTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/interceptors/InterceptorTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/interceptors/InterceptorTest.java index 43a4ad9..eff1615 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/interceptors/InterceptorTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/interceptors/InterceptorTest.java @@ -50,7 +50,7 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionRec import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendMessage; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.api.core.RoutingType; -import org.apache.activemq.artemis.core.server.ServerMessage; + import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; import org.apache.activemq.artemis.spi.core.security.ActiveMQJAASSecurityManager; @@ -88,7 +88,7 @@ public class InterceptorTest extends ActiveMQTestBase { if (packet.getType() == PacketImpl.SESS_SEND) { SessionSendMessage p = (SessionSendMessage) packet; - ServerMessage sm = (ServerMessage) p.getMessage(); + Message sm = p.getMessage(); sm.putStringProperty(InterceptorTest.key, "orange"); } @@ -165,7 +165,7 @@ public class InterceptorTest extends ActiveMQTestBase { if (packet.getType() == PacketImpl.SESS_RECEIVE_MSG) { SessionReceiveMessage p = (SessionReceiveMessage) packet; - ServerMessage sm = (ServerMessage) p.getMessage(); + Message sm = p.getMessage(); sm.putStringProperty(InterceptorTest.key, "orange"); } @@ -319,7 +319,7 @@ public class InterceptorTest extends ActiveMQTestBase { if (packet.getType() == PacketImpl.SESS_SEND) { SessionSendMessage p = (SessionSendMessage) packet; - ServerMessage sm = (ServerMessage) p.getMessage(); + Message sm = p.getMessage(); sm.putIntProperty(key, num);