Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 29C8A200C4B for ; Sun, 5 Mar 2017 17:50:06 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 28447160B57; Sun, 5 Mar 2017 16:50:06 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id DFAA6160B7D for ; Sun, 5 Mar 2017 17:50:03 +0100 (CET) Received: (qmail 6752 invoked by uid 500); 5 Mar 2017 16:50:03 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 6528 invoked by uid 99); 5 Mar 2017 16:50:02 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 05 Mar 2017 16:50:02 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 9D747DFF40; Sun, 5 Mar 2017 16:50:02 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: clebertsuconic@apache.org To: commits@activemq.apache.org Date: Sun, 05 Mar 2017 16:50:04 -0000 Message-Id: <204df300a90a44fa89b6dc135dcf8ab2@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [03/17] activemq-artemis git commit: ARTEMIS-1009 Pure Message Encoding. archived-at: Sun, 05 Mar 2017 16:50:06 -0000 http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4894015d/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImplTest.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImplTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImplTest.java index f374979..5e9a95a 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImplTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImplTest.java @@ -77,7 +77,6 @@ public class ConfigurationImplTest extends ActiveMQTestBase { Assert.assertEquals(ArtemisConstants.DEFAULT_JOURNAL_BUFFER_SIZE_AIO, conf.getJournalBufferSize_AIO()); Assert.assertEquals(ArtemisConstants.DEFAULT_JOURNAL_BUFFER_SIZE_NIO, conf.getJournalBufferSize_NIO()); Assert.assertEquals(ActiveMQDefaultConfiguration.isDefaultJournalLogWriteRate(), conf.isLogJournalWriteRate()); - Assert.assertEquals(ActiveMQDefaultConfiguration.getDefaultJournalPerfBlastPages(), conf.getJournalPerfBlastPages()); Assert.assertEquals(ActiveMQDefaultConfiguration.isDefaultMessageCounterEnabled(), conf.isMessageCounterEnabled()); Assert.assertEquals(ActiveMQDefaultConfiguration.getDefaultMessageCounterMaxDayHistory(), conf.getMessageCounterMaxDayHistory()); Assert.assertEquals(ActiveMQDefaultConfiguration.getDefaultMessageCounterSamplePeriod(), conf.getMessageCounterSamplePeriod()); @@ -232,10 +231,6 @@ public class ConfigurationImplTest extends ActiveMQTestBase { conf.setLogJournalWriteRate(b); Assert.assertEquals(b, conf.isLogJournalWriteRate()); - i = RandomUtil.randomInt(); - conf.setJournalPerfBlastPages(i); - Assert.assertEquals(i, conf.getJournalPerfBlastPages()); - l = RandomUtil.randomLong(); conf.setServerDumpInterval(l); Assert.assertEquals(l, conf.getServerDumpInterval()); @@ -434,10 +429,6 @@ public class ConfigurationImplTest extends ActiveMQTestBase { conf.setLogJournalWriteRate(b); Assert.assertEquals(b, conf.isLogJournalWriteRate()); - i = RandomUtil.randomInt(); - conf.setJournalPerfBlastPages(i); - Assert.assertEquals(i, conf.getJournalPerfBlastPages()); - l = RandomUtil.randomLong(); conf.setServerDumpInterval(l); Assert.assertEquals(l, conf.getServerDumpInterval()); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4894015d/artemis-server/src/test/java/org/apache/activemq/artemis/core/filter/impl/FilterTest.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/filter/impl/FilterTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/filter/impl/FilterTest.java index d73accd..1eb749b 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/filter/impl/FilterTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/filter/impl/FilterTest.java @@ -18,10 +18,10 @@ package org.apache.activemq.artemis.core.filter.impl; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQInvalidFilterExpressionException; +import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.filter.Filter; -import org.apache.activemq.artemis.core.server.ServerMessage; -import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl; +import org.apache.activemq.artemis.core.message.impl.CoreMessage; import org.apache.activemq.artemis.tests.util.SilentTestCase; import org.apache.activemq.artemis.utils.RandomUtil; import org.junit.Assert; @@ -35,13 +35,13 @@ public class FilterTest extends SilentTestCase { private Filter filter; - private ServerMessage message; + private Message message; @Override @Before public void setUp() throws Exception { super.setUp(); - message = new ServerMessageImpl(1, 1000); + message = new CoreMessage().initBuffer(1024).setMessageID(1); } @Test @@ -59,7 +59,7 @@ public class FilterTest extends SilentTestCase { message.putStringProperty(new SimpleString("color"), new SimpleString("RED")); Assert.assertTrue(filter.match(message)); - message = new ServerMessageImpl(); + message = new CoreMessage(); Assert.assertFalse(filter.match(message)); } @@ -94,7 +94,7 @@ public class FilterTest extends SilentTestCase { filter = FilterImpl.createFilter(new SimpleString("AMQDurable='NON_DURABLE'")); - message = new ServerMessageImpl(); + message = new CoreMessage(); message.setDurable(true); Assert.assertFalse(filter.match(message)); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4894015d/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/group/impl/ClusteredResetMockTest.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/group/impl/ClusteredResetMockTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/group/impl/ClusteredResetMockTest.java index 0e9a3f2..2f18c21 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/group/impl/ClusteredResetMockTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/group/impl/ClusteredResetMockTest.java @@ -23,6 +23,9 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import org.apache.activemq.artemis.api.core.BroadcastGroupConfiguration; +import org.apache.activemq.artemis.api.core.ICoreMessage; +import org.apache.activemq.artemis.api.core.Message; +import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.TransportConfiguration; import org.apache.activemq.artemis.api.core.management.ManagementHelper; @@ -43,8 +46,6 @@ import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.Divert; import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.QueueFactory; -import org.apache.activemq.artemis.api.core.RoutingType; -import org.apache.activemq.artemis.core.server.ServerMessage; import org.apache.activemq.artemis.core.server.cluster.Bridge; import org.apache.activemq.artemis.core.server.cluster.BroadcastGroup; import org.apache.activemq.artemis.core.server.cluster.ClusterConnection; @@ -329,7 +330,7 @@ public class ClusteredResetMockTest extends ActiveMQTestBase { } @Override - public ServerMessage handleMessage(ServerMessage message) throws Exception { + public ICoreMessage handleMessage(Message message) throws Exception { return null; } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4894015d/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java index 622f042..2bd8cb2 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java @@ -16,7 +16,6 @@ */ package org.apache.activemq.artemis.core.server.impl; -import java.io.InputStream; import java.util.Collection; import java.util.HashSet; import java.util.LinkedList; @@ -31,27 +30,26 @@ import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import io.netty.buffer.ByteBuf; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; -import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQPropertyConversionException; import org.apache.activemq.artemis.api.core.Message; +import org.apache.activemq.artemis.api.core.RefCountMessage; +import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.filter.Filter; -import org.apache.activemq.artemis.core.message.BodyEncoder; -import org.apache.activemq.artemis.core.paging.PagingStore; +import org.apache.activemq.artemis.core.message.impl.CoreMessage; import org.apache.activemq.artemis.core.paging.cursor.PageSubscription; +import org.apache.activemq.artemis.core.persistence.Persister; import org.apache.activemq.artemis.core.server.Consumer; import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.RoutingContext; -import org.apache.activemq.artemis.api.core.RoutingType; -import org.apache.activemq.artemis.core.server.ServerMessage; import org.apache.activemq.artemis.core.transaction.Transaction; import org.apache.activemq.artemis.utils.ActiveMQThreadFactory; import org.apache.activemq.artemis.utils.LinkedListIterator; import org.apache.activemq.artemis.utils.RandomUtil; import org.apache.activemq.artemis.utils.ReferenceCounter; -import org.apache.activemq.artemis.utils.TypedProperties; import org.apache.activemq.artemis.utils.UUID; import org.junit.Assert; import org.junit.Test; @@ -283,214 +281,164 @@ public class ScheduledDeliveryHandlerTest extends Assert { } } - class FakeMessage implements ServerMessage { - - final long id; - - FakeMessage(final long id) { - this.id = id; - } - - @Override - public FakeMessage setMessageID(long id) { - return this; - } - - @Override - public long getMessageID() { - return id; - } + class FakeMessage extends RefCountMessage { @Override - public MessageReference createReference(Queue queue) { + public RoutingType getRouteType() { return null; } @Override - public void forceAddress(SimpleString address) { - - } - - @Override - public int incrementRefCount() throws Exception { - return 0; - } - - @Override - public int decrementRefCount() throws Exception { - return 0; + public SimpleString getReplyTo() { + return null; } @Override - public int incrementDurableRefCount() { - return 0; + public Message setReplyTo(SimpleString address) { + return null; } @Override - public int decrementDurableRefCount() { - return 0; + public boolean containsDeliveryAnnotationProperty(SimpleString property) { + return false; } @Override - public ServerMessage copy(long newID) { + public Object removeDeliveryAnnoationProperty(SimpleString key) { return null; } @Override - public ServerMessage copy() { + public Object getDeliveryAnnotationProperty(SimpleString key) { return null; } @Override - public int getMemoryEstimate() { - return 0; - } + public void persist(ActiveMQBuffer targetRecord) { - @Override - public int getRefCount() { - return 0; } @Override - public ServerMessage makeCopyForExpiryOrDLA(long newID, - MessageReference originalReference, - boolean expiry, - boolean copyOriginalHeaders) throws Exception { + public Long getScheduledDeliveryTime() { return null; } @Override - public void setOriginalHeaders(ServerMessage other, MessageReference originalReference, boolean expiry) { - - } - - @Override - public void setPagingStore(PagingStore store) { + public void reloadPersistence(ActiveMQBuffer record) { } @Override - public PagingStore getPagingStore() { + public Persister getPersister() { return null; } @Override - public boolean hasInternalProperties() { - return false; - } - - @Override - public boolean storeIsPaging() { - return false; + public int getPersistSize() { + return 0; } + final long id; @Override - public void encodeMessageIDToBuffer() { - + public CoreMessage toCore() { + return null; } - @Override - public byte[] getDuplicateIDBytes() { - return new byte[0]; + FakeMessage(final long id) { + this.id = id; } @Override - public Object getDuplicateProperty() { - return null; + public FakeMessage setMessageID(long id) { + return this; } @Override - public void encode(ActiveMQBuffer buffer) { - + public long getMessageID() { + return id; } @Override - public void decode(ActiveMQBuffer buffer) { - + public int incrementRefCount() throws Exception { + return 0; } @Override - public void decodeFromBuffer(ActiveMQBuffer buffer) { - + public int decrementRefCount() throws Exception { + return 0; } @Override - public int getEndOfMessagePosition() { + public int incrementDurableRefCount() { return 0; } @Override - public int getEndOfBodyPosition() { + public int decrementDurableRefCount() { return 0; } @Override - public void bodyChanged() { - + public Message copy(long newID) { + return null; } @Override - public boolean isServerMessage() { - return false; + public Message copy() { + return null; } @Override - public ActiveMQBuffer getEncodedBuffer() { - return null; + public int getMemoryEstimate() { + return 0; } @Override - public int getHeadersAndPropertiesEncodeSize() { + public int getRefCount() { return 0; } @Override - public ActiveMQBuffer getWholeBuffer() { - return null; + public byte[] getDuplicateIDBytes() { + return new byte[0]; } @Override - public void encodeHeadersAndProperties(ActiveMQBuffer buffer) { - + public Object getDuplicateProperty() { + return null; } @Override - public void decodeHeadersAndProperties(ActiveMQBuffer buffer) { + public void messageChanged() { } @Override - public BodyEncoder getBodyEncoder() throws ActiveMQException { + public UUID getUserID() { return null; } @Override - public InputStream getBodyInputStream() { + public String getAddress() { return null; } @Override - public void setAddressTransient(SimpleString address) { - - } - - @Override - public TypedProperties getTypedProperties() { + public SimpleString getAddressSimpleString() { return null; } @Override - public UUID getUserID() { + public Message setBuffer(ByteBuf buffer) { return null; } @Override - public FakeMessage setUserID(UUID userID) { - return this; + public ByteBuf getBuffer() { + return null; } - @Override - public SimpleString getAddress() { + public Message setAddress(String address) { return null; } @@ -500,11 +448,6 @@ public class ScheduledDeliveryHandlerTest extends Assert { } @Override - public byte getType() { - return 0; - } - - @Override public boolean isDurable() { return false; } @@ -560,16 +503,6 @@ public class ScheduledDeliveryHandlerTest extends Assert { } @Override - public ActiveMQBuffer getBodyBuffer() { - return null; - } - - @Override - public ActiveMQBuffer getBodyBufferDuplicate() { - return null; - } - - @Override public Message putBooleanProperty(SimpleString key, boolean value) { return null; } @@ -825,13 +758,23 @@ public class ScheduledDeliveryHandlerTest extends Assert { } @Override - public FakeMessage writeBodyBufferBytes(byte[] bytes) { - return this; + public Message setUserID(Object userID) { + return null; } @Override - public FakeMessage writeBodyBufferString(String string) { - return this; + public void copyHeadersAndProperties(Message msg) { + + } + + @Override + public void receiveBuffer(ByteBuf buffer) { + + } + + @Override + public void sendBuffer(ByteBuf buffer, int count) { + } } @@ -1221,7 +1164,7 @@ public class ScheduledDeliveryHandlerTest extends Assert { } @Override - public boolean hasMatchingConsumer(ServerMessage message) { + public boolean hasMatchingConsumer(Message message) { return false; } @@ -1338,12 +1281,12 @@ public class ScheduledDeliveryHandlerTest extends Assert { } @Override - public void route(ServerMessage message, RoutingContext context) throws Exception { + public void route(Message message, RoutingContext context) throws Exception { } @Override - public void routeWithAck(ServerMessage message, RoutingContext context) { + public void routeWithAck(Message message, RoutingContext context) { } @@ -1366,5 +1309,9 @@ public class ScheduledDeliveryHandlerTest extends Assert { public void decDelivering(int size) { } + + + + } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4894015d/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java index ee80054..b1ea206 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java @@ -26,13 +26,13 @@ import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicInteger; import org.apache.activemq.artemis.api.core.ActiveMQException; +import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.Pair; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.io.IOCallback; import org.apache.activemq.artemis.core.io.SequentialFile; import org.apache.activemq.artemis.core.journal.Journal; import org.apache.activemq.artemis.core.journal.JournalLoadInformation; -import org.apache.activemq.artemis.core.message.impl.MessageInternal; import org.apache.activemq.artemis.core.paging.PageTransactionInfo; import org.apache.activemq.artemis.core.paging.PagedMessage; import org.apache.activemq.artemis.core.paging.PagingManager; @@ -53,7 +53,6 @@ import org.apache.activemq.artemis.core.replication.ReplicationManager; import org.apache.activemq.artemis.core.server.LargeServerMessage; import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.core.server.RouteContextList; -import org.apache.activemq.artemis.core.server.ServerMessage; import org.apache.activemq.artemis.core.server.files.FileStoreMonitor; import org.apache.activemq.artemis.core.server.group.impl.GroupBinding; import org.apache.activemq.artemis.core.server.impl.AddressInfo; @@ -323,7 +322,7 @@ public class TransactionImplTest extends ActiveMQTestBase { } @Override - public void storeMessage(ServerMessage message) throws Exception { + public void storeMessage(Message message) throws Exception { } @@ -368,7 +367,7 @@ public class TransactionImplTest extends ActiveMQTestBase { } @Override - public void storeMessageTransactional(long txID, ServerMessage message) throws Exception { + public void storeMessageTransactional(long txID, Message message) throws Exception { } @@ -439,7 +438,7 @@ public class TransactionImplTest extends ActiveMQTestBase { } @Override - public LargeServerMessage createLargeMessage(long id, MessageInternal message) throws Exception { + public LargeServerMessage createLargeMessage(long id, Message message) throws Exception { return null; } @@ -489,11 +488,6 @@ public class TransactionImplTest extends ActiveMQTestBase { } @Override - public void updatePageTransaction(PageTransactionInfo pageTransaction, int depage) throws Exception { - - } - - @Override public void deletePageTransactional(long recordID) throws Exception { } @@ -643,7 +637,7 @@ public class TransactionImplTest extends ActiveMQTestBase { @Override public boolean addToPage(PagingStore store, - ServerMessage msg, + Message msg, Transaction tx, RouteContextList listCtx) throws Exception { return false; http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4894015d/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java index 2f12b05..0bb177d 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java @@ -65,6 +65,7 @@ import java.util.concurrent.atomic.AtomicInteger; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQExceptionType; +import org.apache.activemq.artemis.api.core.ICoreMessage; import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.Pair; import org.apache.activemq.artemis.api.core.SimpleString; @@ -94,6 +95,7 @@ import org.apache.activemq.artemis.core.journal.RecordInfo; import org.apache.activemq.artemis.core.journal.impl.JournalFile; import org.apache.activemq.artemis.core.journal.impl.JournalImpl; import org.apache.activemq.artemis.core.journal.impl.JournalReaderCallback; +import org.apache.activemq.artemis.core.message.impl.CoreMessage; import org.apache.activemq.artemis.core.paging.PagingStore; import org.apache.activemq.artemis.core.persistence.impl.journal.OperationContextImpl; import org.apache.activemq.artemis.core.postoffice.Binding; @@ -117,14 +119,12 @@ import org.apache.activemq.artemis.core.server.JournalType; import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.core.server.NodeManager; import org.apache.activemq.artemis.core.server.Queue; -import org.apache.activemq.artemis.core.server.ServerMessage; import org.apache.activemq.artemis.core.server.cluster.ClusterConnection; import org.apache.activemq.artemis.core.server.cluster.ClusterManager; import org.apache.activemq.artemis.core.server.cluster.RemoteQueueBinding; import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType; import org.apache.activemq.artemis.core.server.impl.Activation; import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl; -import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl; import org.apache.activemq.artemis.core.server.impl.SharedNothingBackupActivation; import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; @@ -855,7 +855,7 @@ public abstract class ActiveMQTestBase extends Assert { return testDir1 + "/journal"; } - protected String getJournalDir(final int index, final boolean backup) { + public String getJournalDir(final int index, final boolean backup) { return getJournalDir(getTestDir(), index, backup); } @@ -2079,8 +2079,8 @@ public abstract class ActiveMQTestBase extends Assert { } } - protected ServerMessage generateMessage(final long id) { - ServerMessage message = new ServerMessageImpl(id, 1000); + protected Message generateMessage(final long id) { + ICoreMessage message = new CoreMessage(id, 1000); message.setMessageID(id); @@ -2092,9 +2092,9 @@ public abstract class ActiveMQTestBase extends Assert { } protected MessageReference generateReference(final Queue queue, final long id) { - ServerMessage message = generateMessage(id); + Message message = generateMessage(id); - return message.createReference(queue); + return MessageReference.Factory.createReference(message, queue); } protected int calculateRecordSize(final int size, final int alignment) { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4894015d/artemis-server/src/test/resources/ConfigurationTest-full-config.xml ---------------------------------------------------------------------- diff --git a/artemis-server/src/test/resources/ConfigurationTest-full-config.xml b/artemis-server/src/test/resources/ConfigurationTest-full-config.xml index 9ed5584..0691e95 100644 --- a/artemis-server/src/test/resources/ConfigurationTest-full-config.xml +++ b/artemis-server/src/test/resources/ConfigurationTest-full-config.xml @@ -231,8 +231,6 @@ 33 123 56546 - 5 - true 5000 95 54321 http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4894015d/artemis-tools/src/test/resources/artemis-configuration.xsd ---------------------------------------------------------------------- diff --git a/artemis-tools/src/test/resources/artemis-configuration.xsd b/artemis-tools/src/test/resources/artemis-configuration.xsd index fcb7a20..090f968 100644 --- a/artemis-tools/src/test/resources/artemis-configuration.xsd +++ b/artemis-tools/src/test/resources/artemis-configuration.xsd @@ -673,22 +673,6 @@ - - - - XXX Only meant to be used by project developers - - - - - - - - XXX Only meant to be used by project developers - - - - http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4894015d/examples/features/standard/bridge/src/main/java/org/apache/activemq/artemis/jms/example/HatColourChangeTransformer.java ---------------------------------------------------------------------- diff --git a/examples/features/standard/bridge/src/main/java/org/apache/activemq/artemis/jms/example/HatColourChangeTransformer.java b/examples/features/standard/bridge/src/main/java/org/apache/activemq/artemis/jms/example/HatColourChangeTransformer.java index d7a7bdc..df9d79e 100644 --- a/examples/features/standard/bridge/src/main/java/org/apache/activemq/artemis/jms/example/HatColourChangeTransformer.java +++ b/examples/features/standard/bridge/src/main/java/org/apache/activemq/artemis/jms/example/HatColourChangeTransformer.java @@ -16,14 +16,14 @@ */ package org.apache.activemq.artemis.jms.example; +import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.SimpleString; -import org.apache.activemq.artemis.core.server.ServerMessage; import org.apache.activemq.artemis.core.server.cluster.Transformer; public class HatColourChangeTransformer implements Transformer { @Override - public ServerMessage transform(final ServerMessage message) { + public Message transform(final Message message) { SimpleString propName = new SimpleString("hat"); SimpleString oldProp = message.getSimpleStringProperty(propName); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4894015d/examples/features/standard/divert/src/main/java/org/apache/activemq/artemis/jms/example/AddForwardingTimeTransformer.java ---------------------------------------------------------------------- diff --git a/examples/features/standard/divert/src/main/java/org/apache/activemq/artemis/jms/example/AddForwardingTimeTransformer.java b/examples/features/standard/divert/src/main/java/org/apache/activemq/artemis/jms/example/AddForwardingTimeTransformer.java index 22272d0..2f75d4c 100644 --- a/examples/features/standard/divert/src/main/java/org/apache/activemq/artemis/jms/example/AddForwardingTimeTransformer.java +++ b/examples/features/standard/divert/src/main/java/org/apache/activemq/artemis/jms/example/AddForwardingTimeTransformer.java @@ -16,16 +16,13 @@ */ package org.apache.activemq.artemis.jms.example; -import org.apache.activemq.artemis.api.core.SimpleString; -import org.apache.activemq.artemis.core.server.ServerMessage; +import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.core.server.cluster.Transformer; public class AddForwardingTimeTransformer implements Transformer { @Override - public ServerMessage transform(final ServerMessage message) { - message.putLongProperty(new SimpleString("time_of_forward"), System.currentTimeMillis()); - + public Message transform(final Message message) { return message; } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4894015d/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 5b056ad..e9a5bad 100644 --- a/pom.xml +++ b/pom.xml @@ -82,11 +82,11 @@ 9.4.0.M1 3.6.9.Final 2.4 - 4.1.5.Final - 0.16.0 + 4.1.8.Final + 0.17.0 3.0.19.Final 1.7.21 - 0.11.0 + 0.20.0 0.9.5 1.0-alpha-1 1 @@ -1006,6 +1006,7 @@ -Xep:StaticAccessedFromInstance:ERROR -Xep:SynchronizeOnNonFinalField:ERROR -Xep:WaitNotInLoop:ERROR + -Xdiags:verbose http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4894015d/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/PartialPooledByteBufAllocator.java ---------------------------------------------------------------------- diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/PartialPooledByteBufAllocator.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/PartialPooledByteBufAllocator.java index 12f5568..17f601a 100644 --- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/PartialPooledByteBufAllocator.java +++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/PartialPooledByteBufAllocator.java @@ -37,6 +37,11 @@ public class PartialPooledByteBufAllocator implements ByteBufAllocator { } @Override + public int calculateNewCapacity(int minNewCapacity, int maxCapacity) { + return POOLED.calculateNewCapacity(minNewCapacity, maxCapacity); + } + + @Override public ByteBuf buffer() { return UNPOOLED.heapBuffer(); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4894015d/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableDelivery.java ---------------------------------------------------------------------- diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableDelivery.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableDelivery.java index dea8602..d9bddcb 100644 --- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableDelivery.java +++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableDelivery.java @@ -51,6 +51,12 @@ public class UnmodifiableDelivery implements Delivery { } } + /* waiting Pull Request sent + @Override + public int getDataLength() { + return delivery.getDataLength(); + } */ + @Override public DeliveryState getLocalState() { return delivery.getLocalState(); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4894015d/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/benchmarks/journal/gcfree/EncodersBench.java ---------------------------------------------------------------------- diff --git a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/benchmarks/journal/gcfree/EncodersBench.java b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/benchmarks/journal/gcfree/EncodersBench.java index 833302d..162a512 100644 --- a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/benchmarks/journal/gcfree/EncodersBench.java +++ b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/benchmarks/journal/gcfree/EncodersBench.java @@ -23,6 +23,7 @@ import java.util.concurrent.TimeUnit; import io.netty.buffer.Unpooled; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper; +import org.apache.activemq.artemis.core.journal.EncoderPersister; import org.apache.activemq.artemis.core.journal.EncodingSupport; import org.apache.activemq.artemis.core.journal.impl.dataformat.JournalAddRecord; import org.apache.activemq.artemis.core.journal.impl.dataformat.JournalInternalRecord; @@ -61,7 +62,7 @@ public class EncodersBench { this.byteBuffer.order(ByteOrder.nativeOrder()); this.addJournalRecordEncoder = new AddJournalRecordEncoder(); - this.record = new JournalAddRecord(true, 1, (byte) 1, ZeroEncodingSupport.Instance); + this.record = new JournalAddRecord(true, 1, (byte) 1, EncoderPersister.getInstance(), ZeroEncodingSupport.Instance); this.record.setFileID(1); this.record.setCompactCount((short) 1); this.outBuffer = new ChannelBufferWrapper(Unpooled.directBuffer(this.record.getEncodeSize(), this.record.getEncodeSize()).order(ByteOrder.nativeOrder())); @@ -86,7 +87,7 @@ public class EncodersBench { @Benchmark public int encodeUnalignedWithGarbage() { outBuffer.clear(); - final JournalAddRecord addRecord = new JournalAddRecord(true, 1, (byte) 1, ZeroEncodingSupport.Instance); + final JournalAddRecord addRecord = new JournalAddRecord(true, 1, (byte) 1, EncoderPersister.getInstance(), ZeroEncodingSupport.Instance); addRecord.setFileID(1); addRecord.setCompactCount((short) 1); addRecord.encode(outBuffer); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4894015d/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/JMSBridgeReconnectionTest.java ---------------------------------------------------------------------- diff --git a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/JMSBridgeReconnectionTest.java b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/JMSBridgeReconnectionTest.java index ef71e89..03e2ddc 100644 --- a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/JMSBridgeReconnectionTest.java +++ b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/JMSBridgeReconnectionTest.java @@ -19,8 +19,8 @@ package org.apache.activemq.artemis.tests.extras.byteman; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.core.client.impl.ClientProducerCredits; -import org.apache.activemq.artemis.core.message.impl.MessageInternal; import org.apache.activemq.artemis.core.protocol.core.Packet; import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendMessage; @@ -111,7 +111,7 @@ public class JMSBridgeReconnectionTest extends BridgeTestBase { static int count = 20; static CountDownLatch stopLatch = new CountDownLatch(1); - public static void pause2(MessageInternal msgI, boolean sendBlocking, final ClientProducerCredits theCredits) { + public static void pause2(Message msgI, boolean sendBlocking, final ClientProducerCredits theCredits) { if (msgI.containsProperty("__AMQ_CID")) { count--; } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4894015d/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/MessageCopyTest.java ---------------------------------------------------------------------- diff --git a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/MessageCopyTest.java b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/MessageCopyTest.java deleted file mode 100644 index 1ff58cd..0000000 --- a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/MessageCopyTest.java +++ /dev/null @@ -1,163 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.artemis.tests.extras.byteman; - -import java.util.ArrayList; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.atomic.AtomicInteger; - -import org.apache.activemq.artemis.api.core.ActiveMQBuffer; -import org.apache.activemq.artemis.api.core.SimpleString; -import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendMessage; -import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl; -import org.apache.activemq.artemis.utils.RandomUtil; -import org.jboss.byteman.contrib.bmunit.BMRule; -import org.jboss.byteman.contrib.bmunit.BMRules; -import org.jboss.byteman.contrib.bmunit.BMUnitRunner; -import org.junit.Assert; -import org.junit.Test; -import org.junit.runner.RunWith; - -@RunWith(BMUnitRunner.class) -public class MessageCopyTest { - - @Test - @BMRules( - - rules = {@BMRule( - name = "message-copy0", - targetClass = "org.apache.activemq.artemis.core.server.impl.ServerMessageImpl", - targetMethod = "copy()", - targetLocation = "ENTRY", - action = "System.out.println(\"copy\"), waitFor(\"encode-done\")"), @BMRule( - name = "message-copy-done", - targetClass = "org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendMessage", - targetMethod = "encode(org.apache.activemq.artemis.spi.core.protocol.RemotingConnection)", - targetLocation = "EXIT", - action = "System.out.println(\"encodeDone\"), signalWake(\"encode-done\", true)"), @BMRule( - name = "message-copy1", - targetClass = "org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper", - targetMethod = "copy(int, int)", - condition = "Thread.currentThread().getName().equals(\"T1\")", - targetLocation = "EXIT", - action = "System.out.println(\"setIndex at \" + Thread.currentThread().getName()), waitFor(\"finish-read\")"), @BMRule( - name = "JMSServer.stop wait-init", - targetClass = "org.apache.activemq.artemis.tests.extras.byteman.MessageCopyTest", - targetMethod = "simulateRead", - targetLocation = "EXIT", - action = "signalWake(\"finish-read\", true)")}) - public void testMessageCopyIssue() throws Exception { - final long RUNS = 1; - final ServerMessageImpl msg = new ServerMessageImpl(123, 18); - - msg.setMessageID(RandomUtil.randomLong()); - msg.encodeMessageIDToBuffer(); - msg.setAddress(new SimpleString("Batatantkashf aksjfh aksfjh askfdjh askjfh ")); - - final AtomicInteger errors = new AtomicInteger(0); - - int T1_number = 1; - int T2_number = 1; - - final CountDownLatch latchAlign = new CountDownLatch(T1_number + T2_number); - final CountDownLatch latchReady = new CountDownLatch(1); - class T1 extends Thread { - - T1() { - super("T1"); - } - - @Override - public void run() { - latchAlign.countDown(); - try { - latchReady.await(); - } catch (Exception ignored) { - } - - for (int i = 0; i < RUNS; i++) { - try { - ServerMessageImpl newMsg = (ServerMessageImpl) msg.copy(); - } catch (Throwable e) { - e.printStackTrace(); - errors.incrementAndGet(); - } - } - } - } - - class T2 extends Thread { - - T2() { - super("T2"); - } - - @Override - public void run() { - latchAlign.countDown(); - try { - latchReady.await(); - } catch (Exception ignored) { - } - - for (int i = 0; i < RUNS; i++) { - try { - SessionSendMessage ssm = new SessionSendMessage(msg); - ActiveMQBuffer buf = ssm.encode(null); - System.out.println("reading at buf = " + buf); - simulateRead(buf); - } catch (Throwable e) { - e.printStackTrace(); - errors.incrementAndGet(); - } - } - } - } - - ArrayList threads = new ArrayList<>(); - - for (int i = 0; i < T1_number; i++) { - T1 t = new T1(); - threads.add(t); - t.start(); - } - - for (int i = 0; i < T2_number; i++) { - T2 t2 = new T2(); - threads.add(t2); - t2.start(); - } - - latchAlign.await(); - - latchReady.countDown(); - - for (Thread t : threads) { - t.join(); - } - - Assert.assertEquals(0, errors.get()); - } - - private void simulateRead(ActiveMQBuffer buf) { - buf.setIndex(buf.capacity() / 2, buf.capacity() / 2); - - // ok this is not actually happening during the read process, but changing this shouldn't affect the buffer on copy - buf.writeBytes(new byte[1024]); - } - -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4894015d/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/DuplicateDetectionTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/DuplicateDetectionTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/DuplicateDetectionTest.java index 8456765..0860e97 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/DuplicateDetectionTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/DuplicateDetectionTest.java @@ -33,7 +33,7 @@ import org.apache.activemq.artemis.api.core.client.ClientSession; import org.apache.activemq.artemis.api.core.client.ClientSessionFactory; import org.apache.activemq.artemis.api.core.client.ServerLocator; import org.apache.activemq.artemis.core.config.Configuration; -import org.apache.activemq.artemis.core.message.impl.MessageImpl; + import org.apache.activemq.artemis.core.postoffice.impl.PostOfficeImpl; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.transaction.impl.XidImpl; @@ -191,13 +191,13 @@ public class DuplicateDetectionTest extends ActiveMQTestBase { Assert.assertNull(message2); message = createMessage(session, 3); - message.putBytesProperty(MessageImpl.HDR_BRIDGE_DUPLICATE_ID, dupID.getData()); + message.putBytesProperty(Message.HDR_BRIDGE_DUPLICATE_ID, dupID.getData()); producer.send(message); message2 = consumer.receive(1000); Assert.assertEquals(3, message2.getObjectProperty(propKey)); message = createMessage(session, 4); - message.putBytesProperty(MessageImpl.HDR_BRIDGE_DUPLICATE_ID, dupID.getData()); + message.putBytesProperty(Message.HDR_BRIDGE_DUPLICATE_ID, dupID.getData()); producer.send(message); message2 = consumer.receiveImmediate(); Assert.assertNull(message2); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4894015d/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpDescribedTypePayloadTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpDescribedTypePayloadTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpDescribedTypePayloadTest.java index bbb9c26..138f3cc 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpDescribedTypePayloadTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpDescribedTypePayloadTest.java @@ -24,6 +24,7 @@ import javax.jms.Destination; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; +import javax.jms.ObjectMessage; import javax.jms.Session; import org.apache.activemq.ActiveMQConnectionFactory; @@ -35,6 +36,7 @@ import org.apache.activemq.transport.amqp.client.AmqpNoLocalFilter; import org.apache.activemq.transport.amqp.client.AmqpReceiver; import org.apache.activemq.transport.amqp.client.AmqpSender; import org.apache.activemq.transport.amqp.client.AmqpSession; +import org.apache.qpid.jms.JmsConnectionFactory; import org.junit.Test; /** @@ -119,7 +121,7 @@ public class AmqpDescribedTypePayloadTest extends AmqpClientTestSupport { assertEquals(1, queue.getMessageCount()); // Receive and resend with OpenWire JMS client - ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616"); + JmsConnectionFactory factory = new JmsConnectionFactory("amqp://localhost:61616"); Connection jmsConnection = factory.createConnection(); try { Session jmsSession = jmsConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); @@ -129,7 +131,7 @@ public class AmqpDescribedTypePayloadTest extends AmqpClientTestSupport { Message received = jmsConsumer.receive(5000); assertNotNull(received); - assertTrue(received instanceof BytesMessage); + assertTrue(received instanceof ObjectMessage); MessageProducer jmsProducer = jmsSession.createProducer(destination); jmsProducer.send(received); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4894015d/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java index 0f006bc..70ff658 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java @@ -243,27 +243,6 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport { assertEquals(2, server.locateQueue(SimpleString.toSimpleString(queueC)).getMessageCount() + server.locateQueue(SimpleString.toSimpleString(queueB)).getMessageCount()); } - @Test - public void testAmbiguousMessageRouting() throws Exception { - final String addressA = "addressA"; - final String queueA = "queueA"; - final String queueB = "queueB"; - final String queueC = "queueC"; - final String queueD = "queueD"; - - ActiveMQServerControl serverControl = server.getActiveMQServerControl(); - serverControl.createAddress(addressA, RoutingType.ANYCAST.toString() + "," + RoutingType.MULTICAST.toString()); - serverControl.createQueue(addressA, queueA, RoutingType.ANYCAST.toString()); - serverControl.createQueue(addressA, queueB, RoutingType.ANYCAST.toString()); - serverControl.createQueue(addressA, queueC, RoutingType.MULTICAST.toString()); - serverControl.createQueue(addressA, queueD, RoutingType.MULTICAST.toString()); - - sendMessages(addressA, 1); - - assertEquals(1, server.locateQueue(SimpleString.toSimpleString(queueA)).getMessageCount() + server.locateQueue(SimpleString.toSimpleString(queueB)).getMessageCount()); - assertEquals(2, server.locateQueue(SimpleString.toSimpleString(queueC)).getMessageCount() + server.locateQueue(SimpleString.toSimpleString(queueD)).getMessageCount()); - } - @Test(timeout = 60000) public void testMessageDurableFalse() throws Exception { sendMessages(getTestName(), 1, false); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4894015d/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTest.java index 7962005..39daee4 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTest.java @@ -16,28 +16,6 @@ */ package org.apache.activemq.artemis.tests.integration.amqp; -import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.DELAYED_DELIVERY; -import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.PRODUCT; -import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.VERSION; -import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.contains; - -import java.io.IOException; -import java.io.Serializable; -import java.lang.reflect.Field; -import java.net.URI; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.Enumeration; -import java.util.LinkedHashMap; -import java.util.Map; -import java.util.Optional; -import java.util.Random; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; - import javax.jms.BytesMessage; import javax.jms.Connection; import javax.jms.ConnectionFactory; @@ -61,7 +39,24 @@ import javax.jms.Topic; import javax.jms.TopicPublisher; import javax.jms.TopicSession; import javax.jms.TopicSubscriber; +import java.io.IOException; +import java.io.Serializable; +import java.lang.reflect.Field; +import java.net.URI; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Enumeration; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.Optional; +import java.util.Random; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.management.ResourceNames; import org.apache.activemq.artemis.core.config.Configuration; @@ -71,14 +66,13 @@ import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnector; import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.Queue; -import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.core.server.impl.AddressInfo; import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; +import org.apache.activemq.artemis.protocol.amqp.broker.ProtonProtocolManagerFactory; import org.apache.activemq.artemis.protocol.amqp.client.AMQPClientConnectionFactory; import org.apache.activemq.artemis.protocol.amqp.client.ProtonClientConnectionManager; import org.apache.activemq.artemis.protocol.amqp.client.ProtonClientProtocolManager; -import org.apache.activemq.artemis.protocol.amqp.broker.ProtonProtocolManagerFactory; import org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport; import org.apache.activemq.artemis.protocol.amqp.proton.ProtonServerReceiverContext; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; @@ -105,6 +99,11 @@ import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; +import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.DELAYED_DELIVERY; +import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.PRODUCT; +import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.VERSION; +import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.contains; + @RunWith(Parameterized.class) public class ProtonTest extends ProtonTestBase { @@ -224,6 +223,7 @@ public class ProtonTest extends ProtonTestBase { TextMessage message = session.createTextMessage("test-message"); producer.send(message); + producer.close(); connection.start(); @@ -378,7 +378,7 @@ public class ProtonTest extends ProtonTestBase { receiver.flow(1); // Shouldn't get this since we delayed the message. - assertNull(receiver.receive(5, TimeUnit.SECONDS)); + assertNull(receiver.receive(1, TimeUnit.SECONDS)); } finally { connection.close(); } @@ -827,12 +827,7 @@ public class ProtonTest extends ProtonTestBase { AmqpReceiver receiver = session.createReceiver(coreAddress); server.destroyQueue(new SimpleString(coreAddress), null, false, true); - Wait.waitFor(new Wait.Condition() { - @Override - public boolean isSatisfied() throws Exception { - return receiver.isClosed(); - } - }); + Wait.waitFor(receiver::isClosed); assertTrue(receiver.isClosed()); } finally { amqpConnection.close(); @@ -851,12 +846,7 @@ public class ProtonTest extends ProtonTestBase { connection.disconnect(true); } - Wait.waitFor(new Wait.Condition() { - @Override - public boolean isSatisfied() throws Exception { - return amqpConnection.isClosed(); - } - }); + Wait.waitFor(amqpConnection::isClosed); assertTrue(amqpConnection.isClosed()); assertEquals(AmqpSupport.CONNECTION_FORCED, amqpConnection.getConnection().getRemoteCondition().getCondition()); @@ -937,7 +927,7 @@ public class ProtonTest extends ProtonTestBase { request.setText("[]"); sender.send(request); - AmqpMessage response = receiver.receive(50, TimeUnit.SECONDS); + AmqpMessage response = receiver.receive(5, TimeUnit.SECONDS); Assert.assertNotNull(response); assertNotNull(response); Object section = response.getWrappedMessage().getBody(); @@ -1001,12 +991,7 @@ public class ProtonTest extends ProtonTestBase { final ActiveMQServer remote = createAMQPServer(5673); remote.start(); try { - Wait.waitFor(new Wait.Condition() { - @Override - public boolean isSatisfied() throws Exception { - return remote.isActive(); - } - }); + Wait.waitFor(remote::isActive); } catch (Exception e) { remote.stop(); throw e; http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4894015d/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AckBatchSizeTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AckBatchSizeTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AckBatchSizeTest.java index beac414..847b69e 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AckBatchSizeTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AckBatchSizeTest.java @@ -63,7 +63,8 @@ public class AckBatchSizeTest extends ActiveMQTestBase { ActiveMQServer server = createServer(false); server.start(); int numMessages = 100; - ServerLocator locator = createInVMNonHALocator().setAckBatchSize(numMessages * getMessageEncodeSize(addressA)).setBlockOnAcknowledge(true); + int originalSize = getMessageEncodeSize(addressA); + ServerLocator locator = createInVMNonHALocator().setAckBatchSize(numMessages * originalSize).setBlockOnAcknowledge(true); ClientSessionFactory cf = createSessionFactory(locator); ClientSession sendSession = cf.createSession(false, true, true); @@ -71,20 +72,25 @@ public class AckBatchSizeTest extends ActiveMQTestBase { session.createQueue(addressA, queueA, false); ClientProducer cp = sendSession.createProducer(addressA); for (int i = 0; i < numMessages; i++) { - cp.send(sendSession.createMessage(false)); + ClientMessage message = (ClientMessage)sendSession.createMessage(false).setAddress(addressA); + Assert.assertEquals(originalSize, message.getEncodeSize()); + cp.send(message); + Assert.assertEquals(originalSize, message.getEncodeSize()); } ClientConsumer consumer = session.createConsumer(queueA); session.start(); for (int i = 0; i < numMessages - 1; i++) { + System.out.println("Receive "); ClientMessage m = consumer.receive(5000); - + Assert.assertEquals(0, m.getPropertyNames().size()); + Assert.assertEquals("expected to be " + originalSize, originalSize, m.getEncodeSize()); m.acknowledge(); } ClientMessage m = consumer.receive(5000); Queue q = (Queue) server.getPostOffice().getBinding(queueA).getBindable(); - Assert.assertEquals(100, q.getDeliveringCount()); + Assert.assertEquals(numMessages, q.getDeliveringCount()); m.acknowledge(); Assert.assertEquals(0, q.getDeliveringCount()); sendSession.close(); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4894015d/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AcknowledgeTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AcknowledgeTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AcknowledgeTest.java index 0597dd5..042effd 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AcknowledgeTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AcknowledgeTest.java @@ -21,10 +21,14 @@ import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import io.netty.buffer.ByteBuf; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQPropertyConversionException; +import org.apache.activemq.artemis.api.core.ICoreMessage; import org.apache.activemq.artemis.api.core.Message; +import org.apache.activemq.artemis.api.core.RefCountMessage; +import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.client.ClientConsumer; import org.apache.activemq.artemis.api.core.client.ClientMessage; @@ -34,6 +38,7 @@ import org.apache.activemq.artemis.api.core.client.ClientSessionFactory; import org.apache.activemq.artemis.api.core.client.MessageHandler; import org.apache.activemq.artemis.api.core.client.ServerLocator; import org.apache.activemq.artemis.core.client.impl.ClientSessionInternal; +import org.apache.activemq.artemis.core.persistence.Persister; import org.apache.activemq.artemis.core.protocol.core.impl.ActiveMQConsumerContext; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.Queue; @@ -332,10 +337,138 @@ public class AcknowledgeTest extends ActiveMQTestBase { } } - class FakeMessageWithID implements Message { + class FakeMessageWithID extends RefCountMessage { final long id; + @Override + public RoutingType getRouteType() { + return null; + } + + @Override + public SimpleString getReplyTo() { + return null; + } + + @Override + public Message setReplyTo(SimpleString address) { + return null; + } + + @Override + public boolean containsDeliveryAnnotationProperty(SimpleString property) { + return false; + } + + @Override + public Object removeDeliveryAnnoationProperty(SimpleString key) { + return null; + } + + @Override + public Object getDeliveryAnnotationProperty(SimpleString key) { + return null; + } + + @Override + public int getPersistSize() { + return 0; + } + + @Override + public void persist(ActiveMQBuffer targetRecord) { + } + + @Override + public Persister getPersister() { + return null; + } + + @Override + public void reloadPersistence(ActiveMQBuffer record) { + + } + + @Override + public Long getScheduledDeliveryTime() { + return null; + } + + @Override + public ICoreMessage toCore() { + return null; + } + + @Override + public void receiveBuffer(ByteBuf buffer) { + + } + + @Override + public void sendBuffer(ByteBuf buffer, int count) { + + } + @Override + public Message setUserID(Object userID) { + return null; + } + + @Override + public void copyHeadersAndProperties(Message msg) { + + } + + @Override + public void messageChanged() { + + } + + @Override + public Message copy() { + return null; + } + + @Override + public Message copy(long newID) { + return null; + } + + @Override + public Message setMessageID(long id) { + return null; + } + + @Override + public int getRefCount() { + return 0; + } + + @Override + public int incrementRefCount() throws Exception { + return 0; + } + + @Override + public int decrementRefCount() throws Exception { + return 0; + } + + @Override + public int incrementDurableRefCount() { + return 0; + } + + @Override + public int decrementDurableRefCount() { + return 0; + } + + @Override + public int getMemoryEstimate() { + return 0; + } + FakeMessageWithID(final long id) { this.id = id; } @@ -351,23 +484,33 @@ public class AcknowledgeTest extends ActiveMQTestBase { } @Override - public FakeMessageWithID setUserID(UUID userID) { - return this; + public String getAddress() { + return null; } @Override - public SimpleString getAddress() { + public SimpleString getAddressSimpleString() { return null; } @Override - public Message setAddress(SimpleString address) { + public Message setBuffer(ByteBuf buffer) { return null; } @Override - public byte getType() { - return 0; + public ByteBuf getBuffer() { + return null; + } + + @Override + public Message setAddress(String address) { + return null; + } + + @Override + public Message setAddress(SimpleString address) { + return null; } @Override @@ -426,16 +569,6 @@ public class AcknowledgeTest extends ActiveMQTestBase { } @Override - public ActiveMQBuffer getBodyBuffer() { - return null; - } - - @Override - public ActiveMQBuffer getBodyBufferDuplicate() { - return null; - } - - @Override public Message putBooleanProperty(SimpleString key, boolean value) { return null; } @@ -689,15 +822,5 @@ public class AcknowledgeTest extends ActiveMQTestBase { public Map toPropertyMap() { return null; } - - @Override - public FakeMessageWithID writeBodyBufferBytes(byte[] bytes) { - return this; - } - - @Override - public FakeMessageWithID writeBodyBufferString(String string) { - return this; - } } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4894015d/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerTest.java index 8f00b2a..b957291 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerTest.java @@ -16,6 +16,18 @@ */ package org.apache.activemq.artemis.tests.integration.client; +import javax.jms.BytesMessage; +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.DeliveryMode; +import javax.jms.MapMessage; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.ObjectMessage; +import javax.jms.Session; +import javax.jms.StreamMessage; +import javax.jms.TextMessage; +import java.io.Serializable; import java.util.Arrays; import java.util.Collection; import java.util.Set; @@ -27,6 +39,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQIllegalStateException; import org.apache.activemq.artemis.api.core.Interceptor; import org.apache.activemq.artemis.api.core.Message; +import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.client.ClientConsumer; import org.apache.activemq.artemis.api.core.client.ClientMessage; @@ -41,10 +54,13 @@ import org.apache.activemq.artemis.core.protocol.core.Packet; import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.Queue; +import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; import org.apache.activemq.artemis.tests.util.Wait; +import org.apache.activemq.artemis.utils.ByteUtil; import org.apache.activemq.artemis.utils.ConcurrentHashSet; +import org.apache.qpid.jms.JmsConnectionFactory; import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -54,15 +70,17 @@ import org.junit.runners.Parameterized; @RunWith(value = Parameterized.class) public class ConsumerTest extends ActiveMQTestBase { - @Parameterized.Parameters(name = "isNetty={0}") + @Parameterized.Parameters(name = "isNetty={0}, persistent={1}") public static Collection getParameters() { - return Arrays.asList(new Object[][]{{true}, {false}}); + return Arrays.asList(new Object[][]{{true, true}, {false, false}, {false, true}, {true, false}}); } - public ConsumerTest(boolean netty) { + public ConsumerTest(boolean netty, boolean durable) { this.netty = netty; + this.durable = durable; } + private final boolean durable; private final boolean netty; private ActiveMQServer server; @@ -79,13 +97,31 @@ public class ConsumerTest extends ActiveMQTestBase { public void setUp() throws Exception { super.setUp(); - server = createServer(false, isNetty()); + server = createServer(durable, isNetty()); server.start(); locator = createFactory(isNetty()); } + @Before + public void createQueue() throws Exception { + + ServerLocator locator = createFactory(isNetty()); + + ClientSessionFactory sf = createSessionFactory(locator); + + ClientSession session = sf.createSession(false, true, true, true); + + server.createQueue(QUEUE, RoutingType.ANYCAST, QUEUE, null, true, false); + + session.close(); + + sf.close(); + + locator.close(); + } + @Test public void testStressConnection() throws Exception { @@ -113,34 +149,220 @@ public class ConsumerTest extends ActiveMQTestBase { ClientSession session = sf.createSession(false, true, true, false); - session.createQueue(QUEUE, QUEUE, null, false); - - ClientConsumer consumer = session.createConsumer(QUEUE); - ClientProducer producer = session.createProducer(QUEUE); ClientMessage message = session.createMessage(Message.TEXT_TYPE, true, 0, System.currentTimeMillis(), (byte) 4); message.getBodyBuffer().writeString("hi"); message.putStringProperty("hello", "elo"); producer.send(message); + session.commit(); + + session.close(); + if (durable) { + server.stop(); + server.start(); + } + sf = createSessionFactory(locator); + session = sf.createSession(false, true, true, false); + ClientConsumer consumer = session.createConsumer(QUEUE); + session.start(); if (cancelOnce) { - final ClientConsumerInternal consumerInternal = (ClientConsumerInternal)consumer; + final ClientConsumerInternal consumerInternal = (ClientConsumerInternal) consumer; Wait.waitFor(() -> consumerInternal.getBufferSize() > 0); consumer.close(); consumer = session.createConsumer(QUEUE); } ClientMessage message2 = consumer.receive(1000); + Assert.assertNotNull(message2); + System.out.println("Id::" + message2.getMessageID()); System.out.println("Received " + message2); + System.out.println("Clie:" + ByteUtil.bytesToHex(message2.getBuffer().array(), 4)); + + System.out.println("String::" + message2.getReadOnlyBodyBuffer().readString()); + + Assert.assertEquals("elo", message2.getStringProperty("hello")); + + Assert.assertEquals("hi", message2.getReadOnlyBodyBuffer().readString()); + session.close(); } + @Test + public void testSendReceiveAMQP() throws Throwable { + + if (!isNetty()) { + // no need to run the test, there's no AMQP support + return; + } + + internalSend(true, true); + } + + @Test + public void testSendReceiveCore() throws Throwable { + + if (!isNetty()) { + // no need to run the test, there's no AMQP support + return; + } + + internalSend(false, false); + } + + @Test + public void testSendAMQPReceiveCore() throws Throwable { + + if (!isNetty()) { + // no need to run the test, there's no AMQP support + return; + } + + internalSend(true, false); + } + + @Test + public void testSendCoreReceiveAMQP() throws Throwable { + + if (!isNetty()) { + // no need to run the test, there's no AMQP support + return; + } + + internalSend(false, true); + } + + + + public static class MyTest implements Serializable { + int i; + + public int getI() { + return i; + } + + public MyTest setI(int i) { + this.i = i; + return this; + } + } + + + public void internalSend(boolean amqpSender, boolean amqpConsumer) throws Throwable { + + ConnectionFactory factoryAMQP = new JmsConnectionFactory("amqp://localhost:61616"); + ConnectionFactory factoryCore = new ActiveMQConnectionFactory(); + + + Connection connection = (amqpSender ? factoryAMQP : factoryCore).createConnection(); + + try { + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + javax.jms.Queue queue = session.createQueue(QUEUE.toString()); + MessageProducer producer = session.createProducer(queue); + producer.setDeliveryMode(DeliveryMode.PERSISTENT); + + long time = System.currentTimeMillis(); + int NUMBER_OF_MESSAGES = 100; + for (int i = 0; i < NUMBER_OF_MESSAGES; i++) { + TextMessage msg = session.createTextMessage("hello " + i); + msg.setIntProperty("mycount", i); + producer.send(msg); + + ObjectMessage objectMessage = session.createObjectMessage(new MyTest().setI(i)); + producer.send(objectMessage); + + MapMessage mapMessage = session.createMapMessage(); + mapMessage.setInt("intOne", i); + mapMessage.setString("stringOne", Integer.toString(i)); + producer.send(mapMessage); + + StreamMessage stream = session.createStreamMessage(); + stream.writeBoolean(true); + stream.writeInt(i); + producer.send(stream); + + BytesMessage bytes = session.createBytesMessage(); + bytes.writeUTF("string " + i); + producer.send(bytes); + } + long end = System.currentTimeMillis(); + + System.out.println("Time = " + (end - time)); + + { + TextMessage dummyMessage = session.createTextMessage(); + dummyMessage.setJMSType("car"); + dummyMessage.setStringProperty("color", "red"); + dummyMessage.setLongProperty("weight", 3000); + dummyMessage.setText("testSelectorExampleFromSpecs:1"); + producer.send(dummyMessage); + + dummyMessage = session.createTextMessage(); + dummyMessage.setJMSType("car"); + dummyMessage.setStringProperty("color", "blue"); + dummyMessage.setLongProperty("weight", 3000); + dummyMessage.setText("testSelectorExampleFromSpecs:2"); + producer.send(dummyMessage); + } + + + + + connection.close(); + + if (this.durable) { + server.stop(); + server.start(); + } + + connection = (amqpConsumer ? factoryAMQP : factoryCore).createConnection(); + session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + queue = session.createQueue(QUEUE.toString()); + + connection.start(); + + MessageConsumer consumer = session.createConsumer(queue); + + for (int i = 0; i < NUMBER_OF_MESSAGES; i++) { + TextMessage message = (TextMessage) consumer.receive(1000); + Assert.assertNotNull(message); + Assert.assertEquals(i, message.getIntProperty("mycount")); + Assert.assertEquals("hello " + i, message.getText()); + + ObjectMessage objectMessage = (ObjectMessage)consumer.receive(5000); + Assert.assertNotNull(objectMessage); + Assert.assertEquals(i, ((MyTest)objectMessage.getObject()).getI()); + + MapMessage mapMessage = (MapMessage) consumer.receive(1000); + Assert.assertNotNull(mapMessage); + Assert.assertEquals(i, mapMessage.getInt("intOne")); + Assert.assertEquals(Integer.toString(i), mapMessage.getString("stringOne")); + + StreamMessage stream = (StreamMessage)consumer.receive(5000); + Assert.assertTrue(stream.readBoolean()); + Assert.assertEquals(i, stream.readInt()); + + BytesMessage bytes = (BytesMessage) consumer.receive(5000); + Assert.assertEquals("string " + i, bytes.readUTF()); + } + + consumer.close(); + + consumer = session.createConsumer(queue, "JMSType = 'car' AND color = 'blue' AND weight > 2500"); + + TextMessage msg = (TextMessage) consumer.receive(1000); + Assert.assertEquals("testSelectorExampleFromSpecs:2", msg.getText()); + } finally { + connection.close(); + } + } @Test public void testConsumerAckImmediateAutoCommitTrue() throws Exception { @@ -148,8 +370,6 @@ public class ConsumerTest extends ActiveMQTestBase { ClientSession session = sf.createSession(false, true, true, true); - session.createQueue(QUEUE, QUEUE, null, false); - ClientProducer producer = session.createProducer(QUEUE); final int numMessages = 100; @@ -180,8 +400,6 @@ public class ConsumerTest extends ActiveMQTestBase { ClientSession session = sf.createSession(false, true, false, true); - session.createQueue(QUEUE, QUEUE, null, false); - ClientProducer producer = session.createProducer(QUEUE); final int numMessages = 100; @@ -212,8 +430,6 @@ public class ConsumerTest extends ActiveMQTestBase { ClientSession session = sf.createSession(false, true, true, true); - session.createQueue(QUEUE, QUEUE, null, false); - ClientProducer producer = session.createProducer(QUEUE); final int numMessages = 100; @@ -247,8 +463,6 @@ public class ConsumerTest extends ActiveMQTestBase { ClientSession session = sf.createSession(false, true, true, true); - session.createQueue(QUEUE, QUEUE, null, false); - ClientProducer producer = session.createProducer(QUEUE); final int numMessages = 100; @@ -284,11 +498,9 @@ public class ConsumerTest extends ActiveMQTestBase { ClientSession session = sf.createSession(false, true, true); - session.createQueue(QUEUE, QUEUE, null, false); - ClientProducer producer = session.createProducer(QUEUE); - final int numMessages = 10000; + final int numMessages = 100; for (int i = 0; i < numMessages; i++) { ClientMessage message = createTextMessage(session, "m" + i); @@ -338,8 +550,6 @@ public class ConsumerTest extends ActiveMQTestBase { ClientSession session = sf.createSession(false, true, true); - session.createQueue(QUEUE, QUEUE, null, false); - session.start(); ClientProducer producer = session.createProducer(QUEUE); @@ -372,8 +582,6 @@ public class ConsumerTest extends ActiveMQTestBase { ClientSession session = sf.createSession(false, true, true); session.start(); - session.createQueue(QUEUE, QUEUE, null, false); - ClientConsumer consumer = session.createConsumer(QUEUE); consumer.setMessageHandler(new MessageHandler() { @@ -394,8 +602,6 @@ public class ConsumerTest extends ActiveMQTestBase { ClientSession session = sf.createSession(false, true, true); - session.createQueue(QUEUE, QUEUE, null, false); - ClientConsumer consumer = session.createConsumer(QUEUE); consumer.setMessageHandler(new MessageHandler() { @@ -436,7 +642,7 @@ public class ConsumerTest extends ActiveMQTestBase { sessions.add(session); - session.createQueue(QUEUE, QUEUE.concat("" + i), null, false); + session.createQueue(QUEUE, QUEUE.concat("" + i), null, true); if (i == 0) { session.createQueue(QUEUE_RESPONSE, QUEUE_RESPONSE); @@ -550,8 +756,6 @@ public class ConsumerTest extends ActiveMQTestBase { ClientSession session = sf.createTransactedSession(); - session.createQueue(QUEUE, QUEUE, null, false); - ClientProducer producer = session.createProducer(QUEUE); final int numMessages = 100; @@ -598,7 +802,6 @@ public class ConsumerTest extends ActiveMQTestBase { ServerLocator locator = addServerLocator(ServerLocatorImpl.newLocator("vm:/1")); ClientSessionFactory factory = locator.createSessionFactory(); ClientSession session = factory.createSession(); - session.createQueue(QUEUE, QUEUE); ClientProducer producer = session.createProducer(QUEUE); producer.send(session.createMessage(true)); @@ -620,8 +823,6 @@ public class ConsumerTest extends ActiveMQTestBase { ClientSession session = sf.createTransactedSession(); - session.createQueue(QUEUE, QUEUE, null, false); - ClientProducer producer = session.createProducer(QUEUE); final int numMessages = 100; http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4894015d/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java index 81e0ca4..201a96b 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java @@ -28,6 +28,8 @@ import java.util.concurrent.TimeUnit; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.Interceptor; + +import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.client.ClientConsumer; import org.apache.activemq.artemis.api.core.client.ClientMessage; @@ -56,7 +58,6 @@ import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.QueueConfig; import org.apache.activemq.artemis.core.server.ServerConsumer; -import org.apache.activemq.artemis.core.server.ServerMessage; import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl; import org.apache.activemq.artemis.core.server.impl.QueueFactoryImpl; import org.apache.activemq.artemis.core.server.impl.QueueImpl; @@ -519,7 +520,7 @@ public class HangConsumerTest extends ActiveMQTestBase { * @see SessionCallback#sendJmsMessage(org.apache.activemq.artemis.core.server.ServerMessage, long, int) */ @Override - public int sendMessage(MessageReference ref, ServerMessage message, ServerConsumer consumer, int deliveryCount) { + public int sendMessage(MessageReference ref, Message message, ServerConsumer consumer, int deliveryCount) { inCall.countDown(); try { callbackSemaphore.acquire(); @@ -541,7 +542,7 @@ public class HangConsumerTest extends ActiveMQTestBase { */ @Override public int sendLargeMessage(MessageReference reference, - ServerMessage message, + Message message, ServerConsumer consumer, long bodySize, int deliveryCount) {