Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id E666F200C53 for ; Mon, 6 Mar 2017 12:53:52 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id E4DA6160B73; Mon, 6 Mar 2017 11:53:52 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 7B7B4160B8B for ; Mon, 6 Mar 2017 12:53:50 +0100 (CET) Received: (qmail 42985 invoked by uid 500); 6 Mar 2017 11:53:49 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 42230 invoked by uid 99); 6 Mar 2017 11:53:48 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 06 Mar 2017 11:53:48 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id BD5C1DFF5D; Mon, 6 Mar 2017 11:53:48 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: martyntaylor@apache.org To: commits@activemq.apache.org Date: Mon, 06 Mar 2017 11:53:56 -0000 Message-Id: <35eecbd1a8554c65ad93a83b239657ae@git.apache.org> In-Reply-To: <49415fef219940f3b0d08a6a8ade4334@git.apache.org> References: <49415fef219940f3b0d08a6a8ade4334@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [09/22] activemq-artemis git commit: ARTEMIS-1009 Pure Message Encoding. archived-at: Mon, 06 Mar 2017 11:53:53 -0000 http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java index 2943f15..f0f8e97 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java @@ -49,7 +49,6 @@ import org.apache.activemq.artemis.api.core.management.CoreNotificationType; import org.apache.activemq.artemis.api.core.management.ManagementHelper; import org.apache.activemq.artemis.core.filter.Filter; import org.apache.activemq.artemis.core.io.IOCallback; -import org.apache.activemq.artemis.core.message.impl.MessageImpl; import org.apache.activemq.artemis.core.paging.cursor.PageSubscription; import org.apache.activemq.artemis.core.paging.cursor.PagedReference; import org.apache.activemq.artemis.core.persistence.QueueStatus; @@ -71,7 +70,6 @@ import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.RoutingContext; import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.core.server.ScheduledDeliveryHandler; -import org.apache.activemq.artemis.core.server.ServerMessage; import org.apache.activemq.artemis.core.server.cluster.RemoteQueueBinding; import org.apache.activemq.artemis.core.server.cluster.impl.Redistributor; import org.apache.activemq.artemis.core.server.management.ManagementService; @@ -440,12 +438,12 @@ public class QueueImpl implements Queue { } @Override - public void route(final ServerMessage message, final RoutingContext context) throws Exception { + public void route(final Message message, final RoutingContext context) throws Exception { context.addQueue(address, this); } @Override - public void routeWithAck(ServerMessage message, RoutingContext context) { + public void routeWithAck(Message message, RoutingContext context) { context.addQueueWithAck(address, this); } @@ -922,7 +920,7 @@ public class QueueImpl implements Queue { } @Override - public boolean hasMatchingConsumer(final ServerMessage message) { + public boolean hasMatchingConsumer(final Message message) { for (ConsumerHolder holder : consumerList) { Consumer consumer = holder.consumer; @@ -1055,7 +1053,7 @@ public class QueueImpl implements Queue { pageSubscription.ack((PagedReference) ref); postAcknowledge(ref); } else { - ServerMessage message = ref.getMessage(); + Message message = ref.getMessage(); boolean durableRef = message.isDurable() && durable; @@ -1087,7 +1085,7 @@ public class QueueImpl implements Queue { getRefsOperation(tx).addAck(ref); } else { - ServerMessage message = ref.getMessage(); + Message message = ref.getMessage(); boolean durableRef = message.isDurable() && durable; @@ -1111,7 +1109,7 @@ public class QueueImpl implements Queue { @Override public void reacknowledge(final Transaction tx, final MessageReference ref) throws Exception { - ServerMessage message = ref.getMessage(); + Message message = ref.getMessage(); if (message.isDurable() && durable) { tx.setContainsPersistent(); @@ -1216,11 +1214,11 @@ public class QueueImpl implements Queue { return expiryAddress; } - private SimpleString extractAddress(ServerMessage message) { - if (message.containsProperty(Message.HDR_ORIG_MESSAGE_ID)) { - return message.getSimpleStringProperty(Message.HDR_ORIGINAL_ADDRESS); + private SimpleString extractAddress(Message message) { + if (message.containsProperty(Message.HDR_ORIG_MESSAGE_ID.toString())) { + return message.getSimpleStringProperty(Message.HDR_ORIGINAL_ADDRESS.toString()); } else { - return message.getAddress(); + return message.getAddressSimpleString(); } } @@ -1244,7 +1242,7 @@ public class QueueImpl implements Queue { List scheduledMessages = scheduledDeliveryHandler.cancel(null); if (scheduledMessages != null && scheduledMessages.size() > 0) { for (MessageReference ref : scheduledMessages) { - ref.getMessage().putLongProperty(Message.HDR_SCHEDULED_DELIVERY_TIME, ref.getScheduledDeliveryTime()); + ref.getMessage().setScheduledDeliveryTime(ref.getScheduledDeliveryTime()); ref.setScheduledDeliveryTime(0); } this.addHead(scheduledMessages, true); @@ -2274,7 +2272,7 @@ public class QueueImpl implements Queue { public boolean checkRedelivery(final MessageReference reference, final long timeBase, final boolean ignoreRedeliveryDelay) throws Exception { - ServerMessage message = reference.getMessage(); + Message message = reference.getMessage(); if (internalQueue) { if (logger.isTraceEnabled()) { @@ -2337,7 +2335,7 @@ public class QueueImpl implements Queue { final boolean expiry, final boolean rejectDuplicate, final long... queueIDs) throws Exception { - ServerMessage copyMessage = makeCopy(ref, expiry); + Message copyMessage = makeCopy(ref, expiry); copyMessage.setAddress(toAddress); @@ -2346,7 +2344,7 @@ public class QueueImpl implements Queue { for (long id : queueIDs) { buffer.putLong(id); } - copyMessage.putBytesProperty(MessageImpl.HDR_ROUTE_TO_IDS, buffer.array()); + copyMessage.putBytesProperty(Message.HDR_ROUTE_TO_IDS.toString(), buffer.array()); } postOffice.route(copyMessage, tx, false, rejectDuplicate); @@ -2358,7 +2356,7 @@ public class QueueImpl implements Queue { private void moveBetweenSnFQueues(final SimpleString queueSuffix, final Transaction tx, final MessageReference ref) throws Exception { - ServerMessage copyMessage = makeCopy(ref, false, false); + Message copyMessage = makeCopy(ref, false, false); byte[] oldRouteToIDs = null; String targetNodeID; @@ -2366,8 +2364,8 @@ public class QueueImpl implements Queue { // remove the old route for (SimpleString propName : copyMessage.getPropertyNames()) { - if (propName.startsWith(MessageImpl.HDR_ROUTE_TO_IDS)) { - oldRouteToIDs = (byte[]) copyMessage.removeProperty(propName); + if (propName.startsWith(Message.HDR_ROUTE_TO_IDS)) { + oldRouteToIDs = (byte[]) copyMessage.removeProperty(propName.toString()); final String hashcodeToString = oldRouteToIDs.toString(); // don't use Arrays.toString(..) here logger.debug("Removed property from message: " + propName + " = " + hashcodeToString + " (" + ByteBuffer.wrap(oldRouteToIDs).getLong() + ")"); @@ -2420,7 +2418,7 @@ public class QueueImpl implements Queue { } private Pair locateTargetBinding(SimpleString queueSuffix, - ServerMessage copyMessage, + Message copyMessage, long oldQueueID) { String targetNodeID = null; Binding targetBinding = null; @@ -2440,7 +2438,7 @@ public class QueueImpl implements Queue { // parse the queue name of the remote queue binding to determine the node ID String temp = remoteQueueBinding.getQueue().getName().toString(); targetNodeID = temp.substring(temp.lastIndexOf(".") + 1); - logger.debug("Message formerly destined for " + oldQueueName + " with ID: " + oldQueueID + " on address " + copyMessage.getAddress() + " on node " + targetNodeID); + logger.debug("Message formerly destined for " + oldQueueName + " with ID: " + oldQueueID + " on address " + copyMessage.getAddressSimpleString() + " on node " + targetNodeID); // now that we have the name of the queue we need to look through all the bindings again to find the new remote queue binding for (Map.Entry entry2 : postOffice.getAllBindings().entrySet()) { @@ -2468,14 +2466,14 @@ public class QueueImpl implements Queue { return new Pair<>(targetNodeID, targetBinding); } - private ServerMessage makeCopy(final MessageReference ref, final boolean expiry) throws Exception { + private Message makeCopy(final MessageReference ref, final boolean expiry) throws Exception { return makeCopy(ref, expiry, true); } - private ServerMessage makeCopy(final MessageReference ref, + private Message makeCopy(final MessageReference ref, final boolean expiry, final boolean copyOriginalHeaders) throws Exception { - ServerMessage message = ref.getMessage(); + Message message = ref.getMessage(); /* We copy the message and send that to the dla/expiry queue - this is because otherwise we may end up with a ref with the same message id in the @@ -2487,7 +2485,15 @@ public class QueueImpl implements Queue { long newID = storageManager.generateID(); - ServerMessage copy = message.makeCopyForExpiryOrDLA(newID, ref, expiry, copyOriginalHeaders); + Message copy = message.copy(newID); + + if (copyOriginalHeaders) { + copy.referenceOriginalMessage(message, ref != null ? ref.getQueue().getName().toString() : null); + } + + if (expiry) { + copy.putLongProperty(Message.HDR_ACTUAL_EXPIRY_TIME.toString(), System.currentTimeMillis()); + } return copy; } @@ -2549,7 +2555,7 @@ public class QueueImpl implements Queue { tx = new TransactionImpl(storageManager); } - ServerMessage copyMessage = makeCopy(ref, reason == AckReason.EXPIRED); + Message copyMessage = makeCopy(ref, reason == AckReason.EXPIRED); copyMessage.setAddress(address); @@ -2719,7 +2725,7 @@ public class QueueImpl implements Queue { return; } - ServerMessage message; + Message message; try { message = ref.getMessage(); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RefsOperation.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RefsOperation.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RefsOperation.java index 8e3a94b..0f3da07 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RefsOperation.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RefsOperation.java @@ -22,12 +22,12 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; +import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.core.paging.cursor.NonExistentPage; import org.apache.activemq.artemis.core.persistence.StorageManager; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.core.server.Queue; -import org.apache.activemq.artemis.core.server.ServerMessage; import org.apache.activemq.artemis.core.transaction.Transaction; import org.apache.activemq.artemis.core.transaction.TransactionOperationAbstract; import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl; @@ -122,7 +122,7 @@ public class RefsOperation extends TransactionOperationAbstract { try { Transaction ackedTX = new TransactionImpl(storageManager); for (MessageReference ref : ackedRefs) { - ServerMessage message = ref.getMessage(); + Message message = ref.getMessage(); if (message.isDurable()) { int durableRefCount = message.incrementDurableRefCount(); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ScaleDownHandler.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ScaleDownHandler.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ScaleDownHandler.java index a5f96b1..4590c0b 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ScaleDownHandler.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ScaleDownHandler.java @@ -39,7 +39,6 @@ import org.apache.activemq.artemis.api.core.client.ClientSessionFactory; import org.apache.activemq.artemis.api.core.management.ManagementHelper; import org.apache.activemq.artemis.api.core.management.ResourceNames; import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryInternal; -import org.apache.activemq.artemis.core.message.impl.MessageImpl; import org.apache.activemq.artemis.core.paging.PagingManager; import org.apache.activemq.artemis.core.paging.PagingStore; import org.apache.activemq.artemis.core.paging.cursor.PageSubscription; @@ -54,7 +53,6 @@ import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.core.server.NodeManager; import org.apache.activemq.artemis.core.server.Queue; -import org.apache.activemq.artemis.core.server.ServerMessage; import org.apache.activemq.artemis.core.server.cluster.ClusterControl; import org.apache.activemq.artemis.core.server.cluster.ClusterController; import org.apache.activemq.artemis.core.transaction.ResourceManager; @@ -193,7 +191,7 @@ public class ScaleDownHandler { buffer.putLong(queueID); } - message.putBytesProperty(MessageImpl.HDR_ROUTE_TO_IDS, buffer.array()); + message.putBytesProperty(Message.HDR_ROUTE_TO_IDS.toString(), buffer.array()); if (logger.isDebugEnabled()) { if (messageReference.isPaged()) { @@ -264,11 +262,11 @@ public class ScaleDownHandler { byte[] oldRouteToIDs = null; List propertiesToRemove = new ArrayList<>(); - message.removeProperty(MessageImpl.HDR_ROUTE_TO_IDS); + message.removeProperty(Message.HDR_ROUTE_TO_IDS.toString()); for (SimpleString propName : message.getPropertyNames()) { - if (propName.startsWith(MessageImpl.HDR_ROUTE_TO_IDS)) { + if (propName.startsWith(Message.HDR_ROUTE_TO_IDS)) { if (propName.toString().endsWith(propertyEnd)) { - oldRouteToIDs = message.getBytesProperty(propName); + oldRouteToIDs = message.getBytesProperty(propName.toString()); } propertiesToRemove.add(propName); } @@ -277,16 +275,17 @@ public class ScaleDownHandler { // TODO: what if oldRouteToIDs == null ?? for (SimpleString propertyToRemove : propertiesToRemove) { - message.removeProperty(propertyToRemove); + message.removeProperty(propertyToRemove.toString()); } if (queueOnTarget) { - message.putBytesProperty(MessageImpl.HDR_ROUTE_TO_IDS, oldRouteToIDs); + message.putBytesProperty(Message.HDR_ROUTE_TO_IDS.toString(), oldRouteToIDs); } else { - message.putBytesProperty(MessageImpl.HDR_SCALEDOWN_TO_IDS, oldRouteToIDs); + message.putBytesProperty(Message.HDR_SCALEDOWN_TO_IDS.toString(), oldRouteToIDs); } logger.debug("Scaling down message " + message + " from " + address + " to " + message.getAddress() + " on node " + targetNodeId); + producer.send(message.getAddress(), message); messageCount++; @@ -322,13 +321,13 @@ public class ScaleDownHandler { List allOperations = transaction.getAllOperations(); // Get the information of the Prepared TXs so it could replay the TXs - Map, List>> queuesToSendTo = new HashMap<>(); + Map, List>> queuesToSendTo = new HashMap<>(); for (TransactionOperation operation : allOperations) { if (operation instanceof PostOfficeImpl.AddOperation) { PostOfficeImpl.AddOperation addOperation = (PostOfficeImpl.AddOperation) operation; List refs = addOperation.getRelatedMessageReferences(); for (MessageReference ref : refs) { - ServerMessage message = ref.getMessage(); + Message message = ref.getMessage(); Queue queue = ref.getQueue(); long queueID; String queueName = queue.getName().toString(); @@ -336,7 +335,7 @@ public class ScaleDownHandler { if (queueIDs.containsKey(queueName)) { queueID = queueIDs.get(queueName); } else { - queueID = createQueueIfNecessaryAndGetID(queueCreateSession, queue, message.getAddress()); + queueID = createQueueIfNecessaryAndGetID(queueCreateSession, queue, message.getAddressSimpleString()); queueIDs.put(queueName, queueID); // store it so we don't have to look it up every time } Pair, List> queueIds = queuesToSendTo.get(message); @@ -350,7 +349,7 @@ public class ScaleDownHandler { RefsOperation refsOperation = (RefsOperation) operation; List refs = refsOperation.getReferencesToAcknowledge(); for (MessageReference ref : refs) { - ServerMessage message = ref.getMessage(); + Message message = ref.getMessage(); Queue queue = ref.getQueue(); long queueID; String queueName = queue.getName().toString(); @@ -358,7 +357,7 @@ public class ScaleDownHandler { if (queueIDs.containsKey(queueName)) { queueID = queueIDs.get(queueName); } else { - queueID = createQueueIfNecessaryAndGetID(queueCreateSession, queue, message.getAddress()); + queueID = createQueueIfNecessaryAndGetID(queueCreateSession, queue, message.getAddressSimpleString()); queueIDs.put(queueName, queueID); // store it so we don't have to look it up every time } Pair, List> queueIds = queuesToSendTo.get(message); @@ -373,23 +372,23 @@ public class ScaleDownHandler { } ClientProducer producer = session.createProducer(); - for (Map.Entry, List>> entry : queuesToSendTo.entrySet()) { + for (Map.Entry, List>> entry : queuesToSendTo.entrySet()) { List ids = entry.getValue().getA(); ByteBuffer buffer = ByteBuffer.allocate(ids.size() * 8); for (Long id : ids) { buffer.putLong(id); } - ServerMessage message = entry.getKey(); - message.putBytesProperty(MessageImpl.HDR_ROUTE_TO_IDS, buffer.array()); + Message message = entry.getKey(); + message.putBytesProperty(Message.HDR_ROUTE_TO_IDS.toString(), buffer.array()); ids = entry.getValue().getB(); if (ids.size() > 0) { buffer = ByteBuffer.allocate(ids.size() * 8); for (Long id : ids) { buffer.putLong(id); } - message.putBytesProperty(MessageImpl.HDR_ROUTE_TO_ACK_IDS, buffer.array()); + message.putBytesProperty(Message.HDR_ROUTE_TO_ACK_IDS.toString(), buffer.array()); } - producer.send(message.getAddress(), message); + producer.send(message.getAddressSimpleString().toString(), message); } session.end(xid, XAResource.TMSUCCESS); session.prepare(xid); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java index bcc6df1..710a22b 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java @@ -31,12 +31,14 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQBuffers; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQIllegalStateException; +import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.management.CoreNotificationType; import org.apache.activemq.artemis.api.core.management.ManagementHelper; import org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl; import org.apache.activemq.artemis.core.filter.Filter; -import org.apache.activemq.artemis.core.message.BodyEncoder; +import org.apache.activemq.artemis.core.message.LargeBodyEncoder; +import org.apache.activemq.artemis.core.message.impl.CoreMessage; import org.apache.activemq.artemis.core.persistence.StorageManager; import org.apache.activemq.artemis.core.postoffice.Binding; import org.apache.activemq.artemis.core.postoffice.QueueBinding; @@ -48,7 +50,6 @@ import org.apache.activemq.artemis.core.server.LargeServerMessage; import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.ServerConsumer; -import org.apache.activemq.artemis.core.server.ServerMessage; import org.apache.activemq.artemis.core.server.ServerSession; import org.apache.activemq.artemis.core.server.SlowConsumerDetectionListener; import org.apache.activemq.artemis.core.server.management.ManagementService; @@ -205,7 +206,6 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { this.creationTime = System.currentTimeMillis(); - if (browseOnly) { browserDeliverer = new BrowserDeliverer(messageQueue.browserIterator()); } else { @@ -341,7 +341,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { } return HandleStatus.BUSY; } - final ServerMessage message = ref.getMessage(); + final Message message = ref.getMessage(); if (filter != null && !filter.match(message)) { if (logger.isTraceEnabled()) { @@ -400,7 +400,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { @Override public void proceedDeliver(MessageReference reference) throws Exception { try { - ServerMessage message = reference.getMessage(); + Message message = reference.getMessage(); if (message.isLargeMessage() && supportLargeMessage) { if (largeMessageDeliverer == null) { @@ -507,17 +507,15 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { * there are no other messages to be delivered. */ @Override - public void forceDelivery(final long sequence) { - forceDelivery(sequence, new Runnable() { - @Override - public void run() { - ServerMessage forcedDeliveryMessage = new ServerMessageImpl(storageManager.generateID(), 50); + public void forceDelivery(final long sequence) { + forceDelivery(sequence, () -> { + Message forcedDeliveryMessage = new CoreMessage(storageManager.generateID(), 50); - forcedDeliveryMessage.putLongProperty(ClientConsumerImpl.FORCED_DELIVERY_MESSAGE, sequence); - forcedDeliveryMessage.setAddress(messageQueue.getName()); + forcedDeliveryMessage.putLongProperty(ClientConsumerImpl.FORCED_DELIVERY_MESSAGE, sequence); + forcedDeliveryMessage.setAddress(messageQueue.getName()); + + callback.sendMessage(null, forcedDeliveryMessage, ServerConsumerImpl.this, 0); - callback.sendMessage(null, forcedDeliveryMessage, ServerConsumerImpl.this, 0); - } }); } @@ -1018,7 +1016,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { * @param ref * @param message */ - private void deliverStandardMessage(final MessageReference ref, final ServerMessage message) { + private void deliverStandardMessage(final MessageReference ref, final Message message) throws ActiveMQException { int packetSize = callback.sendMessage(ref, message, ServerConsumerImpl.this, ref.getDeliveryCount()); if (availableCredits != null) { @@ -1070,7 +1068,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { */ private long positionPendingLargeMessage; - private BodyEncoder context; + private LargeBodyEncoder context; private LargeMessageDeliverer(final LargeServerMessage message, final MessageReference ref) throws Exception { largeMessage = message; http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerMessageImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerMessageImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerMessageImpl.java deleted file mode 100644 index 39e77ca..0000000 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerMessageImpl.java +++ /dev/null @@ -1,341 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.artemis.core.server.impl; - -import java.io.InputStream; -import java.util.concurrent.atomic.AtomicInteger; - -import org.apache.activemq.artemis.api.core.Message; -import org.apache.activemq.artemis.api.core.SimpleString; -import org.apache.activemq.artemis.core.message.impl.MessageImpl; -import org.apache.activemq.artemis.core.paging.PagingStore; -import org.apache.activemq.artemis.core.server.MessageReference; -import org.apache.activemq.artemis.core.server.Queue; -import org.apache.activemq.artemis.core.server.ServerMessage; -import org.apache.activemq.artemis.utils.DataConstants; -import org.apache.activemq.artemis.utils.MemorySize; -import org.apache.activemq.artemis.utils.TypedProperties; - -public class ServerMessageImpl extends MessageImpl implements ServerMessage { - - private final AtomicInteger durableRefCount = new AtomicInteger(); - - private final AtomicInteger refCount = new AtomicInteger(); - - private PagingStore pagingStore; - - private static final int memoryOffset; - - private boolean persisted = false; - - static { - // This is an estimate of how much memory a ServerMessageImpl takes up, exclusing body and properties - // Note, it is only an estimate, it's not possible to be entirely sure with Java - // This figure is calculated using the test utilities in org.apache.activemq.tests.unit.util.sizeof - // The value is somewhat higher on 64 bit architectures, probably due to different alignment - - if (MemorySize.is64bitArch()) { - memoryOffset = 352; - } else { - memoryOffset = 232; - } - } - - /* - * Constructor for when reading from network - */ - public ServerMessageImpl() { - } - - /* - * Construct a MessageImpl from storage, or notification, or before routing - */ - public ServerMessageImpl(final long messageID, final int initialMessageBufferSize) { - super(initialMessageBufferSize); - - this.messageID = messageID; - } - - /* - * Copy constructor - */ - protected ServerMessageImpl(final ServerMessageImpl other) { - super(other); - } - - /* - * Copy constructor - */ - protected ServerMessageImpl(final ServerMessageImpl other, TypedProperties properties) { - super(other, properties); - } - - @Override - public boolean isServerMessage() { - return true; - } - - @Override - public ServerMessageImpl setMessageID(final long id) { - messageID = id; - return this; - } - - @Override - public MessageReference createReference(final Queue queue) { - MessageReference ref = new MessageReferenceImpl(this, queue); - - return ref; - } - - @Override - public boolean hasInternalProperties() { - return properties.hasInternalProperties(); - } - - @Override - public int incrementRefCount() throws Exception { - int count = refCount.incrementAndGet(); - - if (pagingStore != null) { - if (count == 1) { - pagingStore.addSize(getMemoryEstimate() + MessageReferenceImpl.getMemoryEstimate()); - } else { - pagingStore.addSize(MessageReferenceImpl.getMemoryEstimate()); - } - } - - return count; - } - - @Override - public int decrementRefCount() throws Exception { - int count = refCount.decrementAndGet(); - - if (count < 0) { - // this could happen on paged messages since they are not routed and incrementRefCount is never called - return count; - } - - if (pagingStore != null) { - if (count == 0) { - pagingStore.addSize(-getMemoryEstimate() - MessageReferenceImpl.getMemoryEstimate()); - - if (buffer != null) { - // release the buffer now - buffer.byteBuf().release(); - } - } else { - pagingStore.addSize(-MessageReferenceImpl.getMemoryEstimate()); - } - } - - return count; - } - - @Override - public int incrementDurableRefCount() { - return durableRefCount.incrementAndGet(); - } - - @Override - public int decrementDurableRefCount() { - return durableRefCount.decrementAndGet(); - } - - @Override - public int getRefCount() { - return refCount.get(); - } - - @Override - public boolean isLargeMessage() { - return false; - } - - private volatile int memoryEstimate = -1; - - @Override - public int getMemoryEstimate() { - if (memoryEstimate == -1) { - memoryEstimate = ServerMessageImpl.memoryOffset + buffer.capacity() + properties.getMemoryOffset(); - } - - return memoryEstimate; - } - - @Override - public ServerMessage copy(final long newID) { - ServerMessage m = new ServerMessageImpl(this); - - m.setMessageID(newID); - - return m; - } - - @Override - public ServerMessage copy() { - // This is a simple copy, used only to avoid changing original properties - return new ServerMessageImpl(this); - } - - public ServerMessage makeCopyForExpiryOrDLA(final long newID, - MessageReference originalReference, - final boolean expiry) throws Exception { - return makeCopyForExpiryOrDLA(newID, originalReference, expiry, true); - } - - @Override - public ServerMessage makeCopyForExpiryOrDLA(final long newID, - MessageReference originalReference, - final boolean expiry, - final boolean copyOriginalHeaders) throws Exception { - /* - We copy the message and send that to the dla/expiry queue - this is - because otherwise we may end up with a ref with the same message id in the - queue more than once which would barf - this might happen if the same message had been - expire from multiple subscriptions of a topic for example - We set headers that hold the original message address, expiry time - and original message id - */ - - ServerMessage copy = copy(newID); - - if (copyOriginalHeaders) { - copy.setOriginalHeaders(this, originalReference, expiry); - } - - return copy; - } - - @Override - public void setOriginalHeaders(final ServerMessage other, - final MessageReference originalReference, - final boolean expiry) { - SimpleString originalQueue = other.getSimpleStringProperty(Message.HDR_ORIGINAL_QUEUE); - - if (originalQueue != null) { - putStringProperty(Message.HDR_ORIGINAL_QUEUE, originalQueue); - } else if (originalReference != null) { - putStringProperty(Message.HDR_ORIGINAL_QUEUE, originalReference.getQueue().getName()); - } - - if (other.containsProperty(Message.HDR_ORIG_MESSAGE_ID)) { - putStringProperty(Message.HDR_ORIGINAL_ADDRESS, other.getSimpleStringProperty(Message.HDR_ORIGINAL_ADDRESS)); - - putLongProperty(Message.HDR_ORIG_MESSAGE_ID, other.getLongProperty(Message.HDR_ORIG_MESSAGE_ID)); - } else { - putStringProperty(Message.HDR_ORIGINAL_ADDRESS, other.getAddress()); - - putLongProperty(Message.HDR_ORIG_MESSAGE_ID, other.getMessageID()); - } - - // reset expiry - setExpiration(0); - - if (expiry) { - long actualExpiryTime = System.currentTimeMillis(); - - putLongProperty(Message.HDR_ACTUAL_EXPIRY_TIME, actualExpiryTime); - } - - bufferValid = false; - } - - @Override - public void setPagingStore(final PagingStore pagingStore) { - this.pagingStore = pagingStore; - - // On the server side, we reset the address to point to the instance of address in the paging store - // Otherwise each message would have its own copy of the address String which would take up more memory - address = pagingStore.getAddress(); - } - - @Override - public synchronized void forceAddress(final SimpleString address) { - this.address = address; - bufferValid = false; - } - - @Override - public PagingStore getPagingStore() { - return pagingStore; - } - - @Override - public boolean storeIsPaging() { - if (pagingStore != null) { - return pagingStore.isPaging(); - } else { - return false; - } - } - - @Override - public String toString() { - try { - return "ServerMessage[messageID=" + messageID + ",durable=" + isDurable() + ",userID=" + getUserID() + ",priority=" + this.getPriority() + ", bodySize=" + this.getBodyBufferDuplicate().capacity() + - ", timestamp=" + toDate(getTimestamp()) + ",expiration=" + toDate(getExpiration()) + - ", durable=" + durable + ", address=" + getAddress() + ",properties=" + properties.toString() + "]@" + System.identityHashCode(this); - } catch (Throwable e) { - return "ServerMessage[messageID=" + messageID + "]"; - } - } - - private static String toDate(long timestamp) { - if (timestamp == 0) { - return "0"; - } else { - return new java.util.Date(timestamp).toString(); - } - - } - - @Override - public InputStream getBodyInputStream() { - return null; - } - - // Encoding stuff - - @Override - public void encodeMessageIDToBuffer() { - // We first set the message id - this needs to be set on the buffer since this buffer will be re-used - - buffer.setLong(buffer.getInt(MessageImpl.BUFFER_HEADER_SPACE) + DataConstants.SIZE_INT, messageID); - } - - @Override - public byte[] getDuplicateIDBytes() { - Object duplicateID = getDuplicateProperty(); - - if (duplicateID == null) { - return null; - } else { - if (duplicateID instanceof SimpleString) { - return ((SimpleString) duplicateID).getData(); - } else { - return (byte[]) duplicateID; - } - } - } - - @Override - public Object getDuplicateProperty() { - return getObjectProperty(Message.HDR_DUPLICATE_DETECTION_ID); - } -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java index 52ecda1..97187e7 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java @@ -31,6 +31,7 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicLong; +import org.apache.activemq.artemis.Closeable; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQIOErrorException; import org.apache.activemq.artemis.api.core.ActiveMQIllegalStateException; @@ -41,12 +42,10 @@ import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.client.ClientSession; import org.apache.activemq.artemis.api.core.management.CoreNotificationType; import org.apache.activemq.artemis.api.core.management.ManagementHelper; -import org.apache.activemq.artemis.core.client.impl.ClientMessageImpl; import org.apache.activemq.artemis.core.exception.ActiveMQXAException; import org.apache.activemq.artemis.core.filter.Filter; import org.apache.activemq.artemis.core.filter.impl.FilterImpl; import org.apache.activemq.artemis.core.io.IOCallback; -import org.apache.activemq.artemis.core.message.impl.MessageInternal; import org.apache.activemq.artemis.core.paging.PagingManager; import org.apache.activemq.artemis.core.paging.PagingStore; import org.apache.activemq.artemis.core.persistence.OperationContext; @@ -66,14 +65,13 @@ import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.server.AddressQueryResult; import org.apache.activemq.artemis.core.server.BindingQueryResult; -import org.apache.activemq.artemis.core.server.LargeServerMessage; import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.QueueQueryResult; import org.apache.activemq.artemis.core.server.RoutingContext; import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.core.server.ServerConsumer; -import org.apache.activemq.artemis.core.server.ServerMessage; + import org.apache.activemq.artemis.core.server.ServerSession; import org.apache.activemq.artemis.core.server.TempQueueObserver; import org.apache.activemq.artemis.core.server.management.ManagementService; @@ -90,7 +88,6 @@ import org.apache.activemq.artemis.spi.core.protocol.SessionCallback; import org.apache.activemq.artemis.utils.JsonLoader; import org.apache.activemq.artemis.utils.PrefixUtil; import org.apache.activemq.artemis.utils.TypedProperties; -import org.apache.activemq.artemis.utils.UUID; import org.jboss.logging.Logger; import static org.apache.activemq.artemis.api.core.JsonUtil.nullSafe; @@ -155,9 +152,6 @@ public class ServerSessionImpl implements ServerSession, FailureListener { private final SimpleString managementAddress; - // The current currentLargeMessage being processed - private volatile LargeServerMessage currentLargeMessage; - protected final RoutingContext routingContext = new RoutingContextImpl(null); protected final SessionCallback callback; @@ -171,7 +165,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener { private final OperationContext context; // Session's usage should be by definition single threaded, hence it's not needed to use a concurrentHashMap here - protected final Map> targetAddressInfos = new HashMap<>(); + protected final Map> targetAddressInfos = new HashMap<>(); private final long creationTime = System.currentTimeMillis(); @@ -187,6 +181,8 @@ public class ServerSessionImpl implements ServerSession, FailureListener { private Map prefixes; + private Set closeables; + public ServerSessionImpl(final String name, final String username, final String password, @@ -273,6 +269,14 @@ public class ServerSessionImpl implements ServerSession, FailureListener { } @Override + public void addCloseable(Closeable closeable) { + if (closeables == null) { + closeables = new HashSet<>(); + } + this.closeables.add(closeable); + } + + @Override public void disableSecurity() { this.securityEnabled = false; } @@ -376,11 +380,9 @@ public class ServerSessionImpl implements ServerSession, FailureListener { consumers.clear(); - if (currentLargeMessage != null) { - try { - currentLargeMessage.deleteFile(); - } catch (Throwable error) { - ActiveMQServerLogger.LOGGER.errorDeletingLargeMessageFile(error); + if (closeables != null) { + for (Closeable closeable : closeables) { + closeable.close(failed); } } @@ -1272,30 +1274,12 @@ public class ServerSessionImpl implements ServerSession, FailureListener { } @Override - public void sendLarge(final MessageInternal message) throws Exception { - // need to create the LargeMessage before continue - long id = storageManager.generateID(); - - LargeServerMessage largeMsg = storageManager.createLargeMessage(id, message); - - if (logger.isTraceEnabled()) { - logger.trace("sendLarge::" + largeMsg); - } - - if (currentLargeMessage != null) { - ActiveMQServerLogger.LOGGER.replacingIncompleteLargeMessage(currentLargeMessage.getMessageID()); - } - - currentLargeMessage = largeMsg; - } - - @Override - public RoutingStatus send(final ServerMessage message, final boolean direct) throws Exception { + public RoutingStatus send(final Message message, final boolean direct) throws Exception { return send(message, direct, false); } @Override - public RoutingStatus send(final ServerMessage message, + public RoutingStatus send(final Message message, final boolean direct, boolean noAutoCreateQueue) throws Exception { return send(getCurrentTransaction(), message, direct, noAutoCreateQueue); @@ -1303,7 +1287,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener { @Override public RoutingStatus send(Transaction tx, - final ServerMessage message, + final Message message, final boolean direct, boolean noAutoCreateQueue) throws Exception { @@ -1319,19 +1303,20 @@ public class ServerSessionImpl implements ServerSession, FailureListener { //case the id header already generated. if (!message.isLargeMessage()) { long id = storageManager.generateID(); - + // This will re-encode the message message.setMessageID(id); - message.encodeMessageIDToBuffer(); } if (server.getConfiguration().isPopulateValidatedUser() && validatedUser != null) { message.putStringProperty(Message.HDR_VALIDATED_USER, SimpleString.toSimpleString(validatedUser)); } - SimpleString address = removePrefix(message.getAddress()); + SimpleString originalAddress = message.getAddressSimpleString(); + + SimpleString address = removePrefix(message.getAddressSimpleString()); // In case the prefix was removed, we also need to update the message - if (address != message.getAddress()) { + if (address != message.getAddressSimpleString()) { message.setAddress(address); } @@ -1340,14 +1325,8 @@ public class ServerSessionImpl implements ServerSession, FailureListener { } if (address == null) { - if (message.isDurable()) { - // We need to force a re-encode when the message gets persisted or when it gets reloaded - // it will have no address - message.setAddress(defaultAddress); - } else { - // We don't want to force a re-encode when the message gets sent to the consumer - message.setAddressTransient(defaultAddress); - } + // We don't want to force a re-encode when the message gets sent to the consumer + message.setAddress(defaultAddress); } if (logger.isTraceEnabled()) { @@ -1359,42 +1338,16 @@ public class ServerSessionImpl implements ServerSession, FailureListener { throw ActiveMQMessageBundle.BUNDLE.noAddress(); } - if (message.getAddress().equals(managementAddress)) { + if (message.getAddressSimpleString().equals(managementAddress)) { // It's a management message handleManagementMessage(tx, message, direct); } else { - result = doSend(tx, message, direct, noAutoCreateQueue); + result = doSend(tx, message, originalAddress, direct, noAutoCreateQueue); } return result; } - @Override - public void sendContinuations(final int packetSize, - final long messageBodySize, - final byte[] body, - final boolean continues) throws Exception { - if (currentLargeMessage == null) { - throw ActiveMQMessageBundle.BUNDLE.largeMessageNotInitialised(); - } - - // Immediately release the credits for the continuations- these don't contribute to the in-memory size - // of the message - - currentLargeMessage.addBytes(body); - - if (!continues) { - currentLargeMessage.releaseResources(); - - if (messageBodySize >= 0) { - currentLargeMessage.putLongProperty(Message.HDR_LARGE_BODY_SIZE, messageBodySize); - } - - doSend(tx, currentLargeMessage, false, false); - - currentLargeMessage = null; - } - } @Override public void requestProducerCredits(SimpleString address, final int credits) throws Exception { @@ -1456,7 +1409,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener { @Override public String[] getTargetAddresses() { - Map> copy = cloneTargetAddresses(); + Map> copy = cloneTargetAddresses(); Iterator iter = copy.keySet().iterator(); int num = copy.keySet().size(); String[] addresses = new String[num]; @@ -1470,7 +1423,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener { @Override public String getLastSentMessageID(String address) { - Pair value = targetAddressInfos.get(SimpleString.toSimpleString(address)); + Pair value = targetAddressInfos.get(SimpleString.toSimpleString(address)); if (value != null) { return value.getA().toString(); } else { @@ -1489,9 +1442,9 @@ public class ServerSessionImpl implements ServerSession, FailureListener { @Override public void describeProducersInfo(JsonArrayBuilder array) throws Exception { - Map> targetCopy = cloneTargetAddresses(); + Map> targetCopy = cloneTargetAddresses(); - for (Map.Entry> entry : targetCopy.entrySet()) { + for (Map.Entry> entry : targetCopy.entrySet()) { String uuid = null; if (entry.getValue().getA() != null) { uuid = entry.getValue().getA().toString(); @@ -1566,14 +1519,10 @@ public class ServerSessionImpl implements ServerSession, FailureListener { connectionFailed(me, failedOver); } - public void clearLargeMessage() { - currentLargeMessage = null; - } - private void installJMSHooks() { } - private Map> cloneTargetAddresses() { + private Map> cloneTargetAddresses() { return new HashMap<>(targetAddressInfos); } @@ -1588,10 +1537,10 @@ public class ServerSessionImpl implements ServerSession, FailureListener { } private RoutingStatus handleManagementMessage(final Transaction tx, - final ServerMessage message, + final Message message, final boolean direct) throws Exception { try { - securityCheck(removePrefix(message.getAddress()), CheckType.MANAGE, this); + securityCheck(removePrefix(message.getAddressSimpleString()), CheckType.MANAGE, this); } catch (ActiveMQException e) { if (!autoCommitSends) { tx.markAsRollbackOnly(e); @@ -1599,9 +1548,9 @@ public class ServerSessionImpl implements ServerSession, FailureListener { throw e; } - ServerMessage reply = managementService.handleMessage(message); + Message reply = managementService.handleMessage(message); - SimpleString replyTo = message.getSimpleStringProperty(ClientMessageImpl.REPLYTO_HEADER_NAME); + SimpleString replyTo = message.getReplyTo(); if (replyTo != null) { // TODO: move this check somewhere else? this is a JMS-specific bit of logic in the core impl @@ -1612,7 +1561,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener { } reply.setAddress(replyTo); - doSend(tx, reply, direct, false); + doSend(tx, reply, null, direct, false); } return RoutingStatus.OK; @@ -1669,21 +1618,26 @@ public class ServerSessionImpl implements ServerSession, FailureListener { theTx.rollback(); } + @Override public RoutingStatus doSend(final Transaction tx, - final ServerMessage msg, + final Message msg, + final SimpleString originalAddress, final boolean direct, final boolean noAutoCreateQueue) throws Exception { RoutingStatus result = RoutingStatus.OK; - /** - * TODO Checking message properties on each message is expensive. Instead we should update the API and Core Packets - * to add the RoutingType information directly. - */ - RoutingType routingType = null; - if (msg.containsProperty(Message.HDR_ROUTING_TYPE)) { - routingType = RoutingType.getType(msg.getByteProperty(Message.HDR_ROUTING_TYPE)); - } - Pair art = getAddressAndRoutingType(msg.getAddress(), routingType); + RoutingType routingType = msg.getRouteType(); + + /* TODO-now: How to address here with AMQP? + if (originalAddress != null) { + if (originalAddress.toString().startsWith("anycast:")) { + routingType = RoutingType.ANYCAST; + } else if (originalAddress.toString().startsWith("multicast:")) { + routingType = RoutingType.MULTICAST; + } + } */ + + Pair art = getAddressAndRoutingType(msg.getAddressSimpleString(), routingType); // Consumer // check the user has write access to this address. @@ -1707,10 +1661,10 @@ public class ServerSessionImpl implements ServerSession, FailureListener { result = postOffice.route(msg, routingContext, direct); - Pair value = targetAddressInfos.get(msg.getAddress()); + Pair value = targetAddressInfos.get(msg.getAddressSimpleString()); if (value == null) { - targetAddressInfos.put(msg.getAddress(), new Pair<>(msg.getUserID(), new AtomicLong(1))); + targetAddressInfos.put(msg.getAddressSimpleString(), new Pair<>(msg.getUserID(), new AtomicLong(1))); } else { value.setA(msg.getUserID()); value.getB().incrementAndGet(); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/ManagementService.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/ManagementService.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/ManagementService.java index 0222928..29a2e47 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/ManagementService.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/ManagementService.java @@ -21,6 +21,9 @@ import java.util.Set; import java.util.concurrent.ScheduledExecutorService; import org.apache.activemq.artemis.api.core.BroadcastGroupConfiguration; +import org.apache.activemq.artemis.api.core.ICoreMessage; +import org.apache.activemq.artemis.api.core.Message; +import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.TransportConfiguration; import org.apache.activemq.artemis.api.core.management.ObjectNameBuilder; @@ -41,8 +44,6 @@ import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.Divert; import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.QueueFactory; -import org.apache.activemq.artemis.api.core.RoutingType; -import org.apache.activemq.artemis.core.server.ServerMessage; import org.apache.activemq.artemis.core.server.cluster.Bridge; import org.apache.activemq.artemis.core.server.cluster.BroadcastGroup; import org.apache.activemq.artemis.core.server.cluster.ClusterConnection; @@ -128,5 +129,5 @@ public interface ManagementService extends NotificationService, ActiveMQComponen Object[] getResources(Class resourceType); - ServerMessage handleMessage(ServerMessage message) throws Exception; + ICoreMessage handleMessage(Message message) throws Exception; } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java index 55f2aea..f45aea7 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java @@ -33,7 +33,10 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ScheduledExecutorService; import org.apache.activemq.artemis.api.core.BroadcastGroupConfiguration; +import org.apache.activemq.artemis.api.core.ICoreMessage; import org.apache.activemq.artemis.api.core.JsonUtil; +import org.apache.activemq.artemis.api.core.Message; +import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.TransportConfiguration; import org.apache.activemq.artemis.api.core.management.AcceptorControl; @@ -56,6 +59,7 @@ import org.apache.activemq.artemis.core.management.impl.BroadcastGroupControlImp import org.apache.activemq.artemis.core.management.impl.ClusterConnectionControlImpl; import org.apache.activemq.artemis.core.management.impl.DivertControlImpl; import org.apache.activemq.artemis.core.management.impl.QueueControlImpl; +import org.apache.activemq.artemis.core.message.impl.CoreMessage; import org.apache.activemq.artemis.core.messagecounter.MessageCounter; import org.apache.activemq.artemis.core.messagecounter.MessageCounterManager; import org.apache.activemq.artemis.core.messagecounter.impl.MessageCounterManagerImpl; @@ -71,13 +75,10 @@ import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.server.Divert; import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.QueueFactory; -import org.apache.activemq.artemis.api.core.RoutingType; -import org.apache.activemq.artemis.core.server.ServerMessage; import org.apache.activemq.artemis.core.server.cluster.Bridge; import org.apache.activemq.artemis.core.server.cluster.BroadcastGroup; import org.apache.activemq.artemis.core.server.cluster.ClusterConnection; import org.apache.activemq.artemis.core.server.impl.AddressInfo; -import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl; import org.apache.activemq.artemis.core.server.management.ManagementService; import org.apache.activemq.artemis.core.server.management.Notification; import org.apache.activemq.artemis.core.server.management.NotificationListener; @@ -365,9 +366,11 @@ public class ManagementServiceImpl implements ManagementService { } @Override - public ServerMessage handleMessage(final ServerMessage message) throws Exception { + public ICoreMessage handleMessage(Message message) throws Exception { + message = message.toCore(); // a reply message is sent with the result stored in the message body. - ServerMessage reply = new ServerMessageImpl(storageManager.generateID(), 512); + CoreMessage reply = new CoreMessage(storageManager.generateID(), 512); + reply.setReplyTo(message.getReplyTo()); String resourceName = message.getStringProperty(ManagementHelper.HDR_RESOURCE_NAME); if (logger.isDebugEnabled()) { @@ -631,7 +634,7 @@ public class ManagementServiceImpl implements ManagementService { long messageID = storageManager.generateID(); - ServerMessage notificationMessage = new ServerMessageImpl(messageID, 512); + Message notificationMessage = new CoreMessage(messageID, 512); // Notification messages are always durable so the user can choose whether to add a durable queue to // consume them in http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/TransactionDetail.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/TransactionDetail.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/TransactionDetail.java index efe4cf9..0ee1b7d 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/TransactionDetail.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/TransactionDetail.java @@ -26,8 +26,8 @@ import java.util.List; import java.util.Map; import org.apache.activemq.artemis.api.core.JsonUtil; +import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.core.server.MessageReference; -import org.apache.activemq.artemis.core.server.ServerMessage; import org.apache.activemq.artemis.core.transaction.impl.XidImpl; import org.apache.activemq.artemis.utils.JsonLoader; @@ -97,7 +97,7 @@ public abstract class TransactionDetail { msgJson.add(KEY_MSG_OP_TYPE, opType); - ServerMessage msg = ref.getMessage().copy(); + Message msg = ref.getMessage().copy(); msgJson.add(KEY_MSG_TYPE, decodeMessageType(msg)); JsonUtil.addToObject(KEY_MSG_PROPERTIES, decodeMessageProperties(msg), msgJson); @@ -108,7 +108,7 @@ public abstract class TransactionDetail { return detailJson.build(); } - public abstract String decodeMessageType(ServerMessage msg); + public abstract String decodeMessageType(Message msg); - public abstract Map decodeMessageProperties(ServerMessage msg); + public abstract Map decodeMessageProperties(Message msg); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/CoreTransactionDetail.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/CoreTransactionDetail.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/CoreTransactionDetail.java index 4730596..95036da 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/CoreTransactionDetail.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/CoreTransactionDetail.java @@ -19,8 +19,8 @@ package org.apache.activemq.artemis.core.transaction.impl; import javax.transaction.xa.Xid; import java.util.Map; +import org.apache.activemq.artemis.api.core.ICoreMessage; import org.apache.activemq.artemis.api.core.Message; -import org.apache.activemq.artemis.core.server.ServerMessage; import org.apache.activemq.artemis.core.transaction.Transaction; import org.apache.activemq.artemis.core.transaction.TransactionDetail; @@ -31,8 +31,11 @@ public class CoreTransactionDetail extends TransactionDetail { } @Override - public String decodeMessageType(ServerMessage msg) { - int type = msg.getType(); + public String decodeMessageType(Message msg) { + if (!(msg instanceof ICoreMessage)) { + return "N/A"; + } + int type = ((ICoreMessage)msg).getType(); switch (type) { case Message.DEFAULT_TYPE: // 0 return "Default"; @@ -52,7 +55,7 @@ public class CoreTransactionDetail extends TransactionDetail { } @Override - public Map decodeMessageProperties(ServerMessage msg) { + public Map decodeMessageProperties(Message msg) { return msg.toMap(); } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/MessageConverter.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/MessageConverter.java b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/MessageConverter.java index a342e13..a440e31 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/MessageConverter.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/MessageConverter.java @@ -16,12 +16,12 @@ */ package org.apache.activemq.artemis.spi.core.protocol; -import org.apache.activemq.artemis.core.server.ServerMessage; +import org.apache.activemq.artemis.api.core.ICoreMessage; +import org.apache.activemq.artemis.api.core.Message; -// TODO: use this interface properly on OpenWire -public interface MessageConverter { +public interface MessageConverter { - ServerMessage inbound(Object messageInbound) throws Exception; + ICoreMessage toCore(ProtocolMessage pureMessage) throws Exception; - Object outbound(ServerMessage messageOutbound, int deliveryCount) throws Exception; + ProtocolMessage fromCore(ICoreMessage coreMessage) throws Exception; } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/MessagePersister.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/MessagePersister.java b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/MessagePersister.java new file mode 100644 index 0000000..14891f5 --- /dev/null +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/MessagePersister.java @@ -0,0 +1,88 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.spi.core.protocol; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.activemq.artemis.api.core.ActiveMQBuffer; +import org.apache.activemq.artemis.api.core.Message; +import org.apache.activemq.artemis.core.persistence.Persister; +import org.jboss.logging.Logger; + +public class MessagePersister implements Persister { + + private static final Logger logger = Logger.getLogger(MessagePersister.class); + + private static final MessagePersister theInstance = new MessagePersister(); + + /** This will be used for reading messages */ + private static Map> protocols = new ConcurrentHashMap<>(); + + + public static void registerProtocol(ProtocolManagerFactory manager) { + Persister messagePersister = manager.getPersister(); + if (messagePersister == null) { + logger.warn("Cannot find persister for " + manager); + } else { + registerPersister(manager.getStoreID(), manager.getPersister()); + } + } + + public static void clearPersisters() { + protocols.clear(); + } + + public static void registerPersister(byte recordType, Persister persister) { + protocols.put(recordType, persister); + } + + public static MessagePersister getInstance() { + return theInstance; + } + + + protected MessagePersister() { + } + + protected byte getID() { + return (byte)0; + } + + @Override + public int getEncodeSize(Message record) { + return 0; + } + + + /** Sub classes must add the first short as the protocol-id */ + @Override + public void encode(ActiveMQBuffer buffer, Message record) { + buffer.writeByte(getID()); + } + + @Override + public Message decode(ActiveMQBuffer buffer, Message record) { + byte protocol = buffer.readByte(); + Persister persister = protocols.get(protocol); + if (persister == null) { + throw new NullPointerException("couldn't find factory for type=" + protocol); + } + return persister.decode(buffer, record); + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/ProtocolManager.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/ProtocolManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/ProtocolManager.java index 890fbfe..e29d74d 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/ProtocolManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/ProtocolManager.java @@ -22,12 +22,14 @@ import java.util.Map; import io.netty.channel.ChannelPipeline; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.BaseInterceptor; +import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.remoting.impl.netty.NettyServerConnection; -import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.spi.core.remoting.Acceptor; import org.apache.activemq.artemis.spi.core.remoting.Connection; +/** + * Info: ProtocolManager is loaded by {@link org.apache.activemq.artemis.core.remoting.server.impl.RemotingServiceImpl#loadProtocolManagerFactories(Iterable)} */ public interface ProtocolManager

{ ProtocolManagerFactory

getFactory(); @@ -51,14 +53,6 @@ public interface ProtocolManager

{ boolean isProtocol(byte[] array); /** - * Gets the Message Converter towards ActiveMQ Artemis. - * Notice this being null means no need to convert - * - * @return - */ - MessageConverter getConverter(); - - /** * If this protocols accepts connectoins without an initial handshake. * If true this protocol will be the failback case no other connections are made. * New designed protocols should always require a handshake. This is only useful for legacy protocols. http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/ProtocolManagerFactory.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/ProtocolManagerFactory.java b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/ProtocolManagerFactory.java index d3b1b2e..9574540 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/ProtocolManagerFactory.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/ProtocolManagerFactory.java @@ -20,10 +20,25 @@ import java.util.List; import java.util.Map; import org.apache.activemq.artemis.api.core.BaseInterceptor; +import org.apache.activemq.artemis.api.core.Message; +import org.apache.activemq.artemis.core.persistence.Persister; import org.apache.activemq.artemis.core.server.ActiveMQServer; public interface ProtocolManagerFactory

{ + /** This is to be used to store the protocol-id on Messages. + * Messages are stored on their bare format. + * The protocol manager will be responsible to code or decode messages. + * The caveat here is that the first short-sized bytes need to be this constant. */ + default byte getStoreID() { + return (byte)0; + } + + default Persister getPersister() { + return null; + } + + /** * When you create the ProtocolManager, you should filter out any interceptors that won't belong * to this Protocol. http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java index ee236c7..799e8b0 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java @@ -16,10 +16,10 @@ */ package org.apache.activemq.artemis.spi.core.protocol; +import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.core.server.ServerConsumer; -import org.apache.activemq.artemis.core.server.ServerMessage; import org.apache.activemq.artemis.spi.core.remoting.ReadyListener; public interface SessionCallback { @@ -55,10 +55,10 @@ public interface SessionCallback { // and I wanted to avoid re-fetching paged data in case of GCs on this specific case. // // Future developments may change this, but beware why I have chosen to keep the parameter separated here - int sendMessage(MessageReference ref, ServerMessage message, ServerConsumer consumerID, int deliveryCount); + int sendMessage(MessageReference ref, Message message, ServerConsumer consumerID, int deliveryCount); int sendLargeMessage(MessageReference reference, - ServerMessage message, + Message message, ServerConsumer consumerID, long bodySize, int deliveryCount); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-server/src/main/resources/schema/artemis-configuration.xsd ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/resources/schema/artemis-configuration.xsd b/artemis-server/src/main/resources/schema/artemis-configuration.xsd index 0c33a35..6fdef44 100644 --- a/artemis-server/src/main/resources/schema/artemis-configuration.xsd +++ b/artemis-server/src/main/resources/schema/artemis-configuration.xsd @@ -682,22 +682,6 @@ - - - - XXX Only meant to be used by project developers - - - - - - - - XXX Only meant to be used by project developers - - - - http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImplTest.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImplTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImplTest.java index f374979..5e9a95a 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImplTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImplTest.java @@ -77,7 +77,6 @@ public class ConfigurationImplTest extends ActiveMQTestBase { Assert.assertEquals(ArtemisConstants.DEFAULT_JOURNAL_BUFFER_SIZE_AIO, conf.getJournalBufferSize_AIO()); Assert.assertEquals(ArtemisConstants.DEFAULT_JOURNAL_BUFFER_SIZE_NIO, conf.getJournalBufferSize_NIO()); Assert.assertEquals(ActiveMQDefaultConfiguration.isDefaultJournalLogWriteRate(), conf.isLogJournalWriteRate()); - Assert.assertEquals(ActiveMQDefaultConfiguration.getDefaultJournalPerfBlastPages(), conf.getJournalPerfBlastPages()); Assert.assertEquals(ActiveMQDefaultConfiguration.isDefaultMessageCounterEnabled(), conf.isMessageCounterEnabled()); Assert.assertEquals(ActiveMQDefaultConfiguration.getDefaultMessageCounterMaxDayHistory(), conf.getMessageCounterMaxDayHistory()); Assert.assertEquals(ActiveMQDefaultConfiguration.getDefaultMessageCounterSamplePeriod(), conf.getMessageCounterSamplePeriod()); @@ -232,10 +231,6 @@ public class ConfigurationImplTest extends ActiveMQTestBase { conf.setLogJournalWriteRate(b); Assert.assertEquals(b, conf.isLogJournalWriteRate()); - i = RandomUtil.randomInt(); - conf.setJournalPerfBlastPages(i); - Assert.assertEquals(i, conf.getJournalPerfBlastPages()); - l = RandomUtil.randomLong(); conf.setServerDumpInterval(l); Assert.assertEquals(l, conf.getServerDumpInterval()); @@ -434,10 +429,6 @@ public class ConfigurationImplTest extends ActiveMQTestBase { conf.setLogJournalWriteRate(b); Assert.assertEquals(b, conf.isLogJournalWriteRate()); - i = RandomUtil.randomInt(); - conf.setJournalPerfBlastPages(i); - Assert.assertEquals(i, conf.getJournalPerfBlastPages()); - l = RandomUtil.randomLong(); conf.setServerDumpInterval(l); Assert.assertEquals(l, conf.getServerDumpInterval()); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-server/src/test/java/org/apache/activemq/artemis/core/filter/impl/FilterTest.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/filter/impl/FilterTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/filter/impl/FilterTest.java index d73accd..1eb749b 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/filter/impl/FilterTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/filter/impl/FilterTest.java @@ -18,10 +18,10 @@ package org.apache.activemq.artemis.core.filter.impl; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQInvalidFilterExpressionException; +import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.filter.Filter; -import org.apache.activemq.artemis.core.server.ServerMessage; -import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl; +import org.apache.activemq.artemis.core.message.impl.CoreMessage; import org.apache.activemq.artemis.tests.util.SilentTestCase; import org.apache.activemq.artemis.utils.RandomUtil; import org.junit.Assert; @@ -35,13 +35,13 @@ public class FilterTest extends SilentTestCase { private Filter filter; - private ServerMessage message; + private Message message; @Override @Before public void setUp() throws Exception { super.setUp(); - message = new ServerMessageImpl(1, 1000); + message = new CoreMessage().initBuffer(1024).setMessageID(1); } @Test @@ -59,7 +59,7 @@ public class FilterTest extends SilentTestCase { message.putStringProperty(new SimpleString("color"), new SimpleString("RED")); Assert.assertTrue(filter.match(message)); - message = new ServerMessageImpl(); + message = new CoreMessage(); Assert.assertFalse(filter.match(message)); } @@ -94,7 +94,7 @@ public class FilterTest extends SilentTestCase { filter = FilterImpl.createFilter(new SimpleString("AMQDurable='NON_DURABLE'")); - message = new ServerMessageImpl(); + message = new CoreMessage(); message.setDurable(true); Assert.assertFalse(filter.match(message)); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/group/impl/ClusteredResetMockTest.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/group/impl/ClusteredResetMockTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/group/impl/ClusteredResetMockTest.java index 0e9a3f2..2f18c21 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/group/impl/ClusteredResetMockTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/group/impl/ClusteredResetMockTest.java @@ -23,6 +23,9 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import org.apache.activemq.artemis.api.core.BroadcastGroupConfiguration; +import org.apache.activemq.artemis.api.core.ICoreMessage; +import org.apache.activemq.artemis.api.core.Message; +import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.TransportConfiguration; import org.apache.activemq.artemis.api.core.management.ManagementHelper; @@ -43,8 +46,6 @@ import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.Divert; import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.QueueFactory; -import org.apache.activemq.artemis.api.core.RoutingType; -import org.apache.activemq.artemis.core.server.ServerMessage; import org.apache.activemq.artemis.core.server.cluster.Bridge; import org.apache.activemq.artemis.core.server.cluster.BroadcastGroup; import org.apache.activemq.artemis.core.server.cluster.ClusterConnection; @@ -329,7 +330,7 @@ public class ClusteredResetMockTest extends ActiveMQTestBase { } @Override - public ServerMessage handleMessage(ServerMessage message) throws Exception { + public ICoreMessage handleMessage(Message message) throws Exception { return null; }