Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id D3292200C3E for ; Mon, 6 Mar 2017 12:53:53 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id D000A160B73; Mon, 6 Mar 2017 11:53:53 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id E6DBC160B95 for ; Mon, 6 Mar 2017 12:53:50 +0100 (CET) Received: (qmail 43699 invoked by uid 500); 6 Mar 2017 11:53:50 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 42511 invoked by uid 99); 6 Mar 2017 11:53:49 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 06 Mar 2017 11:53:49 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id F2C76DFF95; Mon, 6 Mar 2017 11:53:48 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: martyntaylor@apache.org To: commits@activemq.apache.org Date: Mon, 06 Mar 2017 11:54:07 -0000 Message-Id: In-Reply-To: <49415fef219940f3b0d08a6a8ade4334@git.apache.org> References: <49415fef219940f3b0d08a6a8ade4334@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [20/22] activemq-artemis git commit: ARTEMIS-1009 Pure Message Encoding. archived-at: Mon, 06 Mar 2017 11:53:54 -0000 http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java index 80116ed..c7a831b 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java @@ -16,10 +16,13 @@ */ package org.apache.activemq.artemis.api.core; +import java.io.InputStream; +import java.util.HashMap; import java.util.Map; import java.util.Set; -import org.apache.activemq.artemis.utils.UUID; +import io.netty.buffer.ByteBuf; +import org.apache.activemq.artemis.core.persistence.Persister; /** * A Message is a routable instance that has a payload. @@ -48,9 +51,41 @@ import org.apache.activemq.artemis.utils.UUID; *

* If conversion is not allowed (for example calling {@code getFloatProperty} on a property set a * {@code boolean}), a {@link ActiveMQPropertyConversionException} will be thrown. + * + * + * User cases that will be covered by Message + * + * Receiving a buffer: + * + * Message encode = new CoreMessage(); // or any other implementation + * encode.receiveBuffer(buffer); + * + * + * Sending to a buffer: + * + * Message encode; + * size = encode.getEncodeSize(); + * encode.encodeDirectly(bufferOutput); + * */ public interface Message { + // This is an estimate of how much memory a Message takes up, exclusing body and properties + // Note, it is only an estimate, it's not possible to be entirely sure with Java + // This figure is calculated using the test utilities in org.apache.activemq.tests.unit.util.sizeof + // The value is somewhat higher on 64 bit architectures, probably due to different alignment + int memoryOffset = 352; + + + SimpleString HDR_ROUTE_TO_IDS = new SimpleString("_AMQ_ROUTE_TO"); + + SimpleString HDR_SCALEDOWN_TO_IDS = new SimpleString("_AMQ_SCALEDOWN_TO"); + + SimpleString HDR_ROUTE_TO_ACK_IDS = new SimpleString("_AMQ_ACK_ROUTE_TO"); + + // used by the bridges to set duplicates + SimpleString HDR_BRIDGE_DUPLICATE_ID = new SimpleString("_AMQ_BRIDGE_DUP"); + /** * the actual time the message was expired. * * * @@ -129,6 +164,91 @@ public interface Message { byte STREAM_TYPE = 6; + + default SimpleString getDeliveryAnnotationPropertyString(SimpleString property) { + Object obj = getDeliveryAnnotationProperty(property); + if (obj instanceof SimpleString) { + return (SimpleString)obj; + } else { + return SimpleString.toSimpleString(obj.toString()); + } + } + + default void cleanupInternalProperties() { + // only on core + } + + RoutingType getRouteType(); + + boolean containsDeliveryAnnotationProperty(SimpleString property); + + /** + * @deprecated do not use this, use through ICoreMessage or ClientMessage + */ + @Deprecated + default InputStream getBodyInputStream() { + return null; + } + + /** + * @deprecated do not use this, use through ICoreMessage or ClientMessage + */ + @Deprecated + default ActiveMQBuffer getBodyBuffer() { + return null; + } + + /** + * @deprecated do not use this, use through ICoreMessage or ClientMessage + */ + @Deprecated + default byte getType() { + return (byte)0; + } + + /** + * @deprecated do not use this, use through ICoreMessage or ClientMessage + */ + @Deprecated + default Message setType(byte type) { + return this; + } + + + void messageChanged(); + + /** Used to calculate what is the delivery time. + * Return null if not scheduled. */ + Long getScheduledDeliveryTime(); + + default Message setScheduledDeliveryTime(Long time) { + return this; + } + + /** Context can be used by the application server to inject extra control, like a protocol specific on the server. + * There is only one per Object, use it wisely! + * + * Note: the intent of this was to replace PageStore reference on Message, but it will be later increased by adidn a ServerPojo + * */ + RefCountMessageListener getContext(); + + SimpleString getReplyTo(); + + Message setReplyTo(SimpleString address); + + Message setContext(RefCountMessageListener context); + + /** The buffer will belong to this message, until release is called. */ + Message setBuffer(ByteBuf buffer); + + ByteBuf getBuffer(); + + /** It will generate a new instance of the message encode, being a deep copy, new properties, new everything */ + Message copy(); + + /** It will generate a new instance of the message encode, being a deep copy, new properties, new everything */ + Message copy(long newID); + /** * Returns the messageID. *
@@ -136,39 +256,45 @@ public interface Message { */ long getMessageID(); + Message setMessageID(long id); + + default boolean isLargeMessage() { + return false; + } + /** - * Returns the userID - this is an optional user specified UUID that can be set to identify the message - * and will be passed around with the message - * - * @return the user id + * Returns the expiration time of this message. */ - UUID getUserID(); + long getExpiration(); /** - * Sets the user ID + * Sets the expiration of this message. * - * @param userID + * @param expiration expiration time */ - Message setUserID(UUID userID); + Message setExpiration(long expiration); /** - * Returns the address this message is sent to. + * Returns whether this message is expired or not. */ - SimpleString getAddress(); + default boolean isExpired() { + if (getExpiration() == 0) { + return false; + } + + return System.currentTimeMillis() - getExpiration() >= 0; + } + /** - * Sets the address to send this message to. + * Returns the userID - this is an optional user specified UUID that can be set to identify the message + * and will be passed around with the message * - * @param address address to send the message to + * @return the user id */ - Message setAddress(SimpleString address); + Object getUserID(); - /** - * Returns this message type. - *

- * See fields {@literal *_TYPE} for possible values. - */ - byte getType(); + Message setUserID(Object userID); /** * Returns whether this message is durable or not. @@ -182,36 +308,18 @@ public interface Message { */ Message setDurable(boolean durable); - /** - * Returns the expiration time of this message. - */ - long getExpiration(); + Persister getPersister(); - /** - * Returns whether this message is expired or not. - */ - boolean isExpired(); + String getAddress(); - /** - * Sets the expiration of this message. - * - * @param expiration expiration time - */ - Message setExpiration(long expiration); + Message setAddress(String address); + + SimpleString getAddressSimpleString(); + + Message setAddress(SimpleString address); - /** - * Returns the message timestamp. - *
- * The timestamp corresponds to the time this message - * was handled by an ActiveMQ Artemis server. - */ long getTimestamp(); - /** - * Sets the message timestamp. - * - * @param timestamp timestamp - */ Message setTimestamp(long timestamp); /** @@ -230,164 +338,116 @@ public interface Message { */ Message setPriority(byte priority); - /** - * Returns the size of the encoded message. - */ - int getEncodeSize(); + /** Used to receive this message from an encoded medium buffer */ + void receiveBuffer(ByteBuf buffer); - /** - * Returns whether this message is a large message or a regular message. - */ - boolean isLargeMessage(); + /** Used to send this message to an encoded medium buffer. + * @param buffer the buffer used. + * @param deliveryCount Some protocols (AMQP) will have this as part of the message. */ + void sendBuffer(ByteBuf buffer, int deliveryCount); - /** - * Returns the message body as an ActiveMQBuffer - */ - ActiveMQBuffer getBodyBuffer(); + int getPersistSize(); - /** - * Writes the input byte array to the message body ActiveMQBuffer - */ - Message writeBodyBufferBytes(byte[] bytes); + void persist(ActiveMQBuffer targetRecord); - /** - * Writes the input String to the message body ActiveMQBuffer - */ - Message writeBodyBufferString(String string); + void reloadPersistence(ActiveMQBuffer record); - /** - * Returns a copy of the message body as an ActiveMQBuffer. Any modification - * of this buffer should not impact the underlying buffer. - */ - ActiveMQBuffer getBodyBufferDuplicate(); + default void releaseBuffer() { + ByteBuf buffer = getBuffer(); + if (buffer != null) { + buffer.release(); + } + setBuffer(null); + } + default void referenceOriginalMessage(final Message original, String originalQueue) { + String queueOnMessage = original.getStringProperty(Message.HDR_ORIGINAL_QUEUE.toString()); - // Properties - // ----------------------------------------------------------------- + if (queueOnMessage != null) { + putStringProperty(Message.HDR_ORIGINAL_QUEUE.toString(), queueOnMessage); + } else if (originalQueue != null) { + putStringProperty(Message.HDR_ORIGINAL_QUEUE.toString(), originalQueue); + } - /** - * Puts a boolean property in this message. - * - * @param key property name - * @param value property value - */ - Message putBooleanProperty(SimpleString key, boolean value); + if (original.containsProperty(Message.HDR_ORIG_MESSAGE_ID.toString())) { + putStringProperty(Message.HDR_ORIGINAL_ADDRESS.toString(), original.getStringProperty(Message.HDR_ORIGINAL_ADDRESS.toString())); - /** - * @see #putBooleanProperty(SimpleString, boolean) - */ - Message putBooleanProperty(String key, boolean value); + putLongProperty(Message.HDR_ORIG_MESSAGE_ID.toString(), original.getLongProperty(Message.HDR_ORIG_MESSAGE_ID.toString())); + } else { + putStringProperty(Message.HDR_ORIGINAL_ADDRESS.toString(), original.getAddress()); - /** - * Puts a byte property in this message. - * - * @param key property name - * @param value property value - */ - Message putByteProperty(SimpleString key, byte value); + putLongProperty(Message.HDR_ORIG_MESSAGE_ID.toString(), original.getMessageID()); + } - /** - * @see #putByteProperty(SimpleString, byte) - */ - Message putByteProperty(String key, byte value); + // reset expiry + setExpiration(0); + } /** - * Puts a byte[] property in this message. - * - * @param key property name - * @param value property value + * it will translate a property named HDR_DUPLICATE_DETECTION_ID. + * @return */ - Message putBytesProperty(SimpleString key, byte[] value); + default byte[] getDuplicateIDBytes() { + Object duplicateID = getDuplicateProperty(); - /** - * @see #putBytesProperty(SimpleString, byte[]) - */ - Message putBytesProperty(String key, byte[] value); + if (duplicateID == null) { + return null; + } else { + if (duplicateID instanceof SimpleString) { + return ((SimpleString) duplicateID).getData(); + } else if (duplicateID instanceof String) { + return new SimpleString(duplicateID.toString()).getData(); + } else { + return (byte[]) duplicateID; + } + } + } /** - * Puts a short property in this message. - * - * @param key property name - * @param value property value + * it will translate a property named HDR_DUPLICATE_DETECTION_ID. + * @return */ - Message putShortProperty(SimpleString key, short value); + default Object getDuplicateProperty() { + return getDeliveryAnnotationProperty(Message.HDR_DUPLICATE_DETECTION_ID); + } - /** - * @see #putShortProperty(SimpleString, short) - */ - Message putShortProperty(String key, short value); - /** - * Puts a char property in this message. - * - * @param key property name - * @param value property value - */ - Message putCharProperty(SimpleString key, char value); + Message putBooleanProperty(String key, boolean value); - /** - * @see #putCharProperty(SimpleString, char) - */ - Message putCharProperty(String key, char value); + Message putByteProperty(String key, byte value); - /** - * Puts an int property in this message. - * - * @param key property name - * @param value property value - */ - Message putIntProperty(SimpleString key, int value); + Message putBytesProperty(String key, byte[] value); - /** - * @see #putIntProperty(SimpleString, int) - */ - Message putIntProperty(String key, int value); + Message putShortProperty(String key, short value); - /** - * Puts a long property in this message. - * - * @param key property name - * @param value property value - */ - Message putLongProperty(SimpleString key, long value); + Message putCharProperty(String key, char value); - /** - * @see #putLongProperty(SimpleString, long) - */ - Message putLongProperty(String key, long value); + Message putIntProperty(String key, int value); - /** - * Puts a float property in this message. - * - * @param key property name - * @param value property value - */ - Message putFloatProperty(SimpleString key, float value); + Message putLongProperty(String key, long value); - /** - * @see #putFloatProperty(SimpleString, float) - */ Message putFloatProperty(String key, float value); - /** - * Puts a double property in this message. - * - * @param key property name - * @param value property value - */ - Message putDoubleProperty(SimpleString key, double value); - - /** - * @see #putDoubleProperty(SimpleString, double) - */ Message putDoubleProperty(String key, double value); - /** - * Puts a SimpleString property in this message. - * - * @param key property name - * @param value property value - */ - Message putStringProperty(SimpleString key, SimpleString value); + + + Message putBooleanProperty(SimpleString key, boolean value); + + Message putByteProperty(SimpleString key, byte value); + + Message putBytesProperty(SimpleString key, byte[] value); + + Message putShortProperty(SimpleString key, short value); + + Message putCharProperty(SimpleString key, char value); + + Message putIntProperty(SimpleString key, int value); + + Message putLongProperty(SimpleString key, long value); + + Message putFloatProperty(SimpleString key, float value); + + Message putDoubleProperty(SimpleString key, double value); /** * Puts a String property in this message. @@ -397,202 +457,127 @@ public interface Message { */ Message putStringProperty(String key, String value); - /** - * Puts an Object property in this message.
- * Accepted types are: - *

    - *
  • Boolean
  • - *
  • Byte
  • - *
  • Short
  • - *
  • Character
  • - *
  • Integer
  • - *
  • Long
  • - *
  • Float
  • - *
  • Double
  • - *
  • String
  • - *
  • SimpleString
  • - *
- * Using any other type will throw a PropertyConversionException. - * - * @param key property name - * @param value property value - * @throws ActiveMQPropertyConversionException if the value is not one of the accepted property - * types. - */ - Message putObjectProperty(SimpleString key, Object value) throws ActiveMQPropertyConversionException; - - /** - * @see #putObjectProperty(SimpleString, Object) - */ Message putObjectProperty(String key, Object value) throws ActiveMQPropertyConversionException; - /** - * Removes the property corresponding to the specified key. - * - * @param key property name - * @return the value corresponding to the specified key or @{code null} - */ - Object removeProperty(SimpleString key); + Message putObjectProperty(SimpleString key, Object value) throws ActiveMQPropertyConversionException; - /** - * @see #removeProperty(SimpleString) - */ Object removeProperty(String key); - /** - * Returns {@code true} if this message contains a property with the given key, {@code false} else. - * - * @param key property name - */ - boolean containsProperty(SimpleString key); - - /** - * @see #containsProperty(SimpleString) - */ boolean containsProperty(String key); - /** - * Returns the property corresponding to the specified key as a Boolean. - * - * @throws ActiveMQPropertyConversionException if the value can not be converted to a Boolean - */ - Boolean getBooleanProperty(SimpleString key) throws ActiveMQPropertyConversionException; - - /** - * @see #getBooleanProperty(SimpleString) - */ Boolean getBooleanProperty(String key) throws ActiveMQPropertyConversionException; - /** - * Returns the property corresponding to the specified key as a Byte. - * - * @throws ActiveMQPropertyConversionException if the value can not be converted to a Byte - */ - Byte getByteProperty(SimpleString key) throws ActiveMQPropertyConversionException; - - /** - * @see #getByteProperty(SimpleString) - */ Byte getByteProperty(String key) throws ActiveMQPropertyConversionException; - /** - * Returns the property corresponding to the specified key as a Double. - * - * @throws ActiveMQPropertyConversionException if the value can not be converted to a Double - */ - Double getDoubleProperty(SimpleString key) throws ActiveMQPropertyConversionException; - - /** - * @see #getDoubleProperty(SimpleString) - */ Double getDoubleProperty(String key) throws ActiveMQPropertyConversionException; - /** - * Returns the property corresponding to the specified key as an Integer. - * - * @throws ActiveMQPropertyConversionException if the value can not be converted to an Integer - */ - Integer getIntProperty(SimpleString key) throws ActiveMQPropertyConversionException; - - /** - * @see #getIntProperty(SimpleString) - */ Integer getIntProperty(String key) throws ActiveMQPropertyConversionException; - /** - * Returns the property corresponding to the specified key as a Long. - * - * @throws ActiveMQPropertyConversionException if the value can not be converted to a Long - */ - Long getLongProperty(SimpleString key) throws ActiveMQPropertyConversionException; - - /** - * @see #getLongProperty(SimpleString) - */ Long getLongProperty(String key) throws ActiveMQPropertyConversionException; - /** - * Returns the property corresponding to the specified key - */ + Object getObjectProperty(String key); + + Short getShortProperty(String key) throws ActiveMQPropertyConversionException; + + Float getFloatProperty(String key) throws ActiveMQPropertyConversionException; + + String getStringProperty(String key) throws ActiveMQPropertyConversionException; + + SimpleString getSimpleStringProperty(String key) throws ActiveMQPropertyConversionException; + + byte[] getBytesProperty(String key) throws ActiveMQPropertyConversionException; + + Object removeProperty(SimpleString key); + + boolean containsProperty(SimpleString key); + + Boolean getBooleanProperty(SimpleString key) throws ActiveMQPropertyConversionException; + + Byte getByteProperty(SimpleString key) throws ActiveMQPropertyConversionException; + + Double getDoubleProperty(SimpleString key) throws ActiveMQPropertyConversionException; + + Integer getIntProperty(SimpleString key) throws ActiveMQPropertyConversionException; + + Long getLongProperty(SimpleString key) throws ActiveMQPropertyConversionException; + Object getObjectProperty(SimpleString key); - /** - * @see #getBooleanProperty(SimpleString) - */ - Object getObjectProperty(String key); + Object removeDeliveryAnnoationProperty(SimpleString key); - /** - * Returns the property corresponding to the specified key as a Short. - * - * @throws ActiveMQPropertyConversionException if the value can not be converted to a Short - */ - Short getShortProperty(SimpleString key) throws ActiveMQPropertyConversionException; + Object getDeliveryAnnotationProperty(SimpleString key); - /** - * @see #getShortProperty(SimpleString) - */ - Short getShortProperty(String key) throws ActiveMQPropertyConversionException; + Short getShortProperty(SimpleString key) throws ActiveMQPropertyConversionException; - /** - * Returns the property corresponding to the specified key as a Float. - * - * @throws ActiveMQPropertyConversionException if the value can not be converted to a Float - */ Float getFloatProperty(SimpleString key) throws ActiveMQPropertyConversionException; - /** - * @see #getFloatProperty(SimpleString) - */ - Float getFloatProperty(String key) throws ActiveMQPropertyConversionException; - - /** - * Returns the property corresponding to the specified key as a String. - * - * @throws ActiveMQPropertyConversionException if the value can not be converted to a String - */ String getStringProperty(SimpleString key) throws ActiveMQPropertyConversionException; - /** - * @see #getStringProperty(SimpleString) - */ - String getStringProperty(String key) throws ActiveMQPropertyConversionException; - - /** - * Returns the property corresponding to the specified key as a SimpleString. - * - * @throws ActiveMQPropertyConversionException if the value can not be converted to a SimpleString - */ SimpleString getSimpleStringProperty(SimpleString key) throws ActiveMQPropertyConversionException; - /** - * @see #getSimpleStringProperty(SimpleString) - */ - SimpleString getSimpleStringProperty(String key) throws ActiveMQPropertyConversionException; - - /** - * Returns the property corresponding to the specified key as a byte[]. - * - * @throws ActiveMQPropertyConversionException if the value can not be converted to a byte[] - */ byte[] getBytesProperty(SimpleString key) throws ActiveMQPropertyConversionException; + Message putStringProperty(SimpleString key, SimpleString value); + /** - * @see #getBytesProperty(SimpleString) + * Returns the size of the encoded message. */ - byte[] getBytesProperty(String key) throws ActiveMQPropertyConversionException; + int getEncodeSize(); /** * Returns all the names of the properties for this message. */ Set getPropertyNames(); + + + int getRefCount(); + + int incrementRefCount() throws Exception; + + int decrementRefCount() throws Exception; + + int incrementDurableRefCount(); + + int decrementDurableRefCount(); + /** * @return Returns the message in Map form, useful when encoding to JSON */ - Map toMap(); + default Map toMap() { + Map map = toPropertyMap(); + map.put("messageID", getMessageID()); + Object userID = getUserID(); + if (getUserID() != null) { + map.put("userID", "ID:" + userID.toString()); + } + + map.put("address", getAddress()); + map.put("durable", isDurable()); + map.put("expiration", getExpiration()); + map.put("timestamp", getTimestamp()); + map.put("priority", (int)getPriority()); + + return map; + } /** * @return Returns the message properties in Map form, useful when encoding to JSON */ - Map toPropertyMap(); + default Map toPropertyMap() { + Map map = new HashMap<>(); + for (SimpleString name : getPropertyNames()) { + map.put(name.toString(), getObjectProperty(name.toString())); + } + return map; + } + + + /** This should make you convert your message into Core format. */ + ICoreMessage toCore(); + + int getMemoryEstimate(); + + + } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/RefCountMessage.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/RefCountMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/RefCountMessage.java new file mode 100644 index 0000000..64dd44d --- /dev/null +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/RefCountMessage.java @@ -0,0 +1,81 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.api.core; + +import java.util.concurrent.atomic.AtomicInteger; + +public abstract class RefCountMessage implements Message { + + private final AtomicInteger durableRefCount = new AtomicInteger(); + + private final AtomicInteger refCount = new AtomicInteger(); + + private RefCountMessageListener context; + + @Override + public Message setContext(RefCountMessageListener context) { + this.context = context; + return this; + } + + @Override + public RefCountMessageListener getContext() { + return context; + } + + @Override + public int getRefCount() { + return refCount.get(); + } + + @Override + public int incrementRefCount() throws Exception { + int count = refCount.incrementAndGet(); + if (context != null) { + context.nonDurableUp(this, count); + } + return count; + } + + @Override + public int incrementDurableRefCount() { + int count = durableRefCount.incrementAndGet(); + if (context != null) { + context.durableUp(this, count); + } + return count; + } + + @Override + public int decrementDurableRefCount() { + int count = durableRefCount.decrementAndGet(); + if (context != null) { + context.durableDown(this, count); + } + return count; + } + + @Override + public int decrementRefCount() throws Exception { + int count = refCount.decrementAndGet(); + if (context != null) { + context.nonDurableDown(this, count); + } + return count; + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/RefCountMessageListener.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/RefCountMessageListener.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/RefCountMessageListener.java new file mode 100644 index 0000000..e68dffd --- /dev/null +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/RefCountMessageListener.java @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.api.core; + +/** If {@link Message#getContext()} != null and is implementing this interface. + * These methods will be called during refCount operations */ +public interface RefCountMessageListener { + + void durableUp(Message message, int durableCount); + + void durableDown(Message message, int durableCount); + + void nonDurableUp(Message message, int nonDurableCoun); + + void nonDurableDown(Message message, int nonDurableCoun); +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ClientMessage.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ClientMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ClientMessage.java index e87d365..67f2150 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ClientMessage.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ClientMessage.java @@ -19,14 +19,15 @@ package org.apache.activemq.artemis.api.core.client; import java.io.InputStream; import java.io.OutputStream; +import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQException; -import org.apache.activemq.artemis.api.core.Message; +import org.apache.activemq.artemis.api.core.ICoreMessage; import org.apache.activemq.artemis.api.core.SimpleString; /** * A ClientMessage represents a message sent and/or received by ActiveMQ Artemis. */ -public interface ClientMessage extends Message { +public interface ClientMessage extends ICoreMessage { /** * Returns the number of times this message was delivered. @@ -123,135 +124,141 @@ public interface ClientMessage extends Message { ClientMessage setBodyInputStream(InputStream bodyInputStream); /** - * Overridden from {@link Message} to enable fluent API + * Return the bodyInputStream for large messages + * @return + */ + @Override + InputStream getBodyInputStream(); + + /** + * The buffer to write the body. + * @return + */ + @Override + ActiveMQBuffer getBodyBuffer(); + + /** + * Overridden from {@link org.apache.activemq.artemis.api.core.Message} to enable fluent API */ @Override ClientMessage putBooleanProperty(SimpleString key, boolean value); /** - * Overridden from {@link Message} to enable fluent API + * Overridden from {@link org.apache.activemq.artemis.api.core.Message} to enable fluent API */ @Override ClientMessage putBooleanProperty(String key, boolean value); /** - * Overridden from {@link Message} to enable fluent API + * Overridden from {@link org.apache.activemq.artemis.api.core.Message} to enable fluent API */ @Override ClientMessage putByteProperty(SimpleString key, byte value); /** - * Overridden from {@link Message} to enable fluent API + * Overridden from {@link org.apache.activemq.artemis.api.core.Message} to enable fluent API */ @Override ClientMessage putByteProperty(String key, byte value); /** - * Overridden from {@link Message} to enable fluent API + * Overridden from {@link org.apache.activemq.artemis.api.core.Message} to enable fluent API */ @Override ClientMessage putBytesProperty(SimpleString key, byte[] value); /** - * Overridden from {@link Message} to enable fluent API + * Overridden from {@link org.apache.activemq.artemis.api.core.Message} to enable fluent API */ @Override ClientMessage putBytesProperty(String key, byte[] value); /** - * Overridden from {@link Message} to enable fluent API + * Overridden from {@link org.apache.activemq.artemis.api.core.Message} to enable fluent API */ @Override ClientMessage putShortProperty(SimpleString key, short value); /** - * Overridden from {@link Message} to enable fluent API + * Overridden from {@link org.apache.activemq.artemis.api.core.Message} to enable fluent API */ @Override ClientMessage putShortProperty(String key, short value); /** - * Overridden from {@link Message} to enable fluent API + * Overridden from {@link org.apache.activemq.artemis.api.core.Message} to enable fluent API */ @Override ClientMessage putCharProperty(SimpleString key, char value); /** - * Overridden from {@link Message} to enable fluent API + * Overridden from {@link org.apache.activemq.artemis.api.core.Message} to enable fluent API */ @Override ClientMessage putCharProperty(String key, char value); /** - * Overridden from {@link Message} to enable fluent API + * Overridden from {@link org.apache.activemq.artemis.api.core.Message} to enable fluent API */ @Override ClientMessage putIntProperty(SimpleString key, int value); /** - * Overridden from {@link Message} to enable fluent API + * Overridden from {@link org.apache.activemq.artemis.api.core.Message} to enable fluent API */ @Override ClientMessage putIntProperty(String key, int value); /** - * Overridden from {@link Message} to enable fluent API + * Overridden from {@link org.apache.activemq.artemis.api.core.Message} to enable fluent API */ @Override ClientMessage putLongProperty(SimpleString key, long value); /** - * Overridden from {@link Message} to enable fluent API + * Overridden from {@link org.apache.activemq.artemis.api.core.Message} to enable fluent API */ @Override ClientMessage putLongProperty(String key, long value); /** - * Overridden from {@link Message} to enable fluent API + * Overridden from {@link org.apache.activemq.artemis.api.core.Message} to enable fluent API */ @Override ClientMessage putFloatProperty(SimpleString key, float value); /** - * Overridden from {@link Message} to enable fluent API + * Overridden from {@link org.apache.activemq.artemis.api.core.Message} to enable fluent API */ @Override ClientMessage putFloatProperty(String key, float value); /** - * Overridden from {@link Message} to enable fluent API + * Overridden from {@link org.apache.activemq.artemis.api.core.Message} to enable fluent API */ @Override ClientMessage putDoubleProperty(SimpleString key, double value); /** - * Overridden from {@link Message} to enable fluent API + * Overridden from {@link org.apache.activemq.artemis.api.core.Message} to enable fluent API */ @Override ClientMessage putDoubleProperty(String key, double value); /** - * Overridden from {@link Message} to enable fluent API - */ - @Override - ClientMessage putStringProperty(SimpleString key, SimpleString value); - - /** - * Overridden from {@link Message} to enable fluent API + * Overridden from {@link org.apache.activemq.artemis.api.core.Message} to enable fluent API */ @Override ClientMessage putStringProperty(String key, String value); /** - * Overridden from {@link Message} to enable fluent API + * Overridden from {@link org.apache.activemq.artemis.api.core.Message} to enable fluent API */ - @Override ClientMessage writeBodyBufferBytes(byte[] bytes); /** - * Overridden from {@link Message} to enable fluent API + * Overridden from {@link org.apache.activemq.artemis.api.core.Message} to enable fluent API */ - @Override ClientMessage writeBodyBufferString(String string); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ManagementHelper.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ManagementHelper.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ManagementHelper.java index 40211c1..946285d 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ManagementHelper.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ManagementHelper.java @@ -18,9 +18,11 @@ package org.apache.activemq.artemis.api.core.management; import javax.json.JsonArray; +import org.apache.activemq.artemis.api.core.ICoreMessage; import org.apache.activemq.artemis.api.core.JsonUtil; import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.message.impl.CoreMessage; /** * Helper class to use ActiveMQ Artemis Core messages to manage server resources. @@ -86,7 +88,7 @@ public final class ManagementHelper { * @param attribute the name of the attribute * @see ResourceNames */ - public static void putAttribute(final Message message, final String resourceName, final String attribute) { + public static void putAttribute(final ICoreMessage message, final String resourceName, final String attribute) { message.putStringProperty(ManagementHelper.HDR_RESOURCE_NAME, new SimpleString(resourceName)); message.putStringProperty(ManagementHelper.HDR_ATTRIBUTE, new SimpleString(attribute)); } @@ -99,7 +101,7 @@ public final class ManagementHelper { * @param operationName the name of the operation to invoke on the resource * @see ResourceNames */ - public static void putOperationInvocation(final Message message, + public static void putOperationInvocation(final ICoreMessage message, final String resourceName, final String operationName) throws Exception { ManagementHelper.putOperationInvocation(message, resourceName, operationName, (Object[]) null); @@ -114,7 +116,7 @@ public final class ManagementHelper { * @param parameters the parameters to use to invoke the server resource * @see ResourceNames */ - public static void putOperationInvocation(final Message message, + public static void putOperationInvocation(final ICoreMessage message, final String resourceName, final String operationName, final Object... parameters) throws Exception { @@ -141,7 +143,7 @@ public final class ManagementHelper { * Used by ActiveMQ Artemis management service. */ public static Object[] retrieveOperationParameters(final Message message) throws Exception { - SimpleString sstring = message.getBodyBuffer().readNullableSimpleString(); + SimpleString sstring = message.toCore().getReadOnlyBodyBuffer().readNullableSimpleString(); String jsonString = (sstring == null) ? null : sstring.toString(); if (jsonString != null) { @@ -170,7 +172,7 @@ public final class ManagementHelper { /** * Used by ActiveMQ Artemis management service. */ - public static void storeResult(final Message message, final Object result) throws Exception { + public static void storeResult(final CoreMessage message, final Object result) throws Exception { String resultString; if (result != null) { @@ -192,7 +194,7 @@ public final class ManagementHelper { * If an error occurred on the server, {@link #hasOperationSucceeded(Message)} will return {@code false}. * and the result will be a String corresponding to the server exception. */ - public static Object[] getResults(final Message message) throws Exception { + public static Object[] getResults(final ICoreMessage message) throws Exception { SimpleString sstring = message.getBodyBuffer().readNullableSimpleString(); String jsonString = (sstring == null) ? null : sstring.toString(); @@ -210,7 +212,7 @@ public final class ManagementHelper { * If an error occurred on the server, {@link #hasOperationSucceeded(Message)} will return {@code false}. * and the result will be a String corresponding to the server exception. */ - public static Object getResult(final Message message) throws Exception { + public static Object getResult(final ICoreMessage message) throws Exception { return getResult(message, null); } @@ -220,7 +222,7 @@ public final class ManagementHelper { * If an error occurred on the server, {@link #hasOperationSucceeded(Message)} will return {@code false}. * and the result will be a String corresponding to the server exception. */ - public static Object getResult(final Message message, Class desiredType) throws Exception { + public static Object getResult(final ICoreMessage message, Class desiredType) throws Exception { Object[] res = ManagementHelper.getResults(message); if (res != null) { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/buffers/impl/ResetLimitWrappedActiveMQBuffer.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/buffers/impl/ResetLimitWrappedActiveMQBuffer.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/buffers/impl/ResetLimitWrappedActiveMQBuffer.java index 900305f..b5d5474 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/buffers/impl/ResetLimitWrappedActiveMQBuffer.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/buffers/impl/ResetLimitWrappedActiveMQBuffer.java @@ -20,18 +20,18 @@ import java.nio.ByteBuffer; import io.netty.buffer.ByteBuf; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; + +import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.SimpleString; -import org.apache.activemq.artemis.core.message.impl.MessageInternal; /** * A ResetLimitWrappedActiveMQBuffer - * TODO: Move this to commons */ public final class ResetLimitWrappedActiveMQBuffer extends ChannelBufferWrapper { private final int limit; - private MessageInternal message; + private Message message; /** * We need to turn of notifications of body changes on reset on the server side when dealing with AMQP conversions, @@ -39,17 +39,17 @@ public final class ResetLimitWrappedActiveMQBuffer extends ChannelBufferWrapper * * @param message */ - public void setMessage(MessageInternal message) { + public void setMessage(Message message) { this.message = message; } - public ResetLimitWrappedActiveMQBuffer(final int limit, final ActiveMQBuffer buffer, final MessageInternal message) { + public ResetLimitWrappedActiveMQBuffer(final int limit, final ActiveMQBuffer buffer, final Message message) { // a wrapped inside a wrapper will increase the stack size. // we fixed this here due to some profiling testing this(limit, unwrap(buffer.byteBuf()).duplicate(), message); } - public ResetLimitWrappedActiveMQBuffer(final int limit, final ByteBuf buffer, final MessageInternal message) { + public ResetLimitWrappedActiveMQBuffer(final int limit, final ByteBuf buffer, final Message message) { // a wrapped inside a wrapper will increase the stack size. // we fixed this here due to some profiling testing super(buffer); @@ -67,7 +67,7 @@ public final class ResetLimitWrappedActiveMQBuffer extends ChannelBufferWrapper private void changed() { if (message != null) { - message.bodyChanged(); + message.messageChanged(); } } @@ -94,8 +94,6 @@ public final class ResetLimitWrappedActiveMQBuffer extends ChannelBufferWrapper @Override public void resetReaderIndex() { - changed(); - buffer.readerIndex(limit); } @@ -256,6 +254,14 @@ public final class ResetLimitWrappedActiveMQBuffer extends ChannelBufferWrapper super.writeBytes(src); } + + @Override + public void writeBytes(final ByteBuf src, final int srcIndex, final int length) { + changed(); + + super.writeBytes(src, srcIndex, length); + } + @Override public void writeBytes(final ActiveMQBuffer src, final int srcIndex, final int length) { changed(); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerImpl.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerImpl.java index 2b4ab7e..82af968 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerImpl.java @@ -569,7 +569,7 @@ public final class ClientConsumerImpl implements ClientConsumerInternal { private void handleRegularMessage(ClientMessageInternal message) { if (message.getAddress() == null) { - message.setAddressTransient(queueInfo.getAddress()); + message.setAddress(queueInfo.getAddress()); } message.onReceipt(this); @@ -625,7 +625,7 @@ public final class ClientConsumerImpl implements ClientConsumerInternal { currentLargeMessageController.setLocal(true); //sets the packet - ActiveMQBuffer qbuff = clMessage.getBodyBuffer(); + ActiveMQBuffer qbuff = clMessage.toCore().getBodyBuffer(); int bytesToRead = qbuff.writerIndex() - qbuff.readerIndex(); final byte[] body = ByteUtil.getActiveArray(qbuff.readBytes(bytesToRead).toByteBuffer()); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientLargeMessageImpl.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientLargeMessageImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientLargeMessageImpl.java index c3cbceb..cbfaf6f 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientLargeMessageImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientLargeMessageImpl.java @@ -59,7 +59,7 @@ public final class ClientLargeMessageImpl extends ClientMessageImpl implements C @Override public int getEncodeSize() { - if (bodyBuffer != null) { + if (writableBuffer != null) { return super.getEncodeSize(); } else { return DataConstants.SIZE_INT + DataConstants.SIZE_INT + getHeadersAndPropertiesEncodeSize(); @@ -93,7 +93,7 @@ public final class ClientLargeMessageImpl extends ClientMessageImpl implements C throw new RuntimeException(e.getMessage(), e); } - return bodyBuffer; + return writableBuffer; } @Override @@ -108,7 +108,7 @@ public final class ClientLargeMessageImpl extends ClientMessageImpl implements C @Override public void saveToOutputStream(final OutputStream out) throws ActiveMQException { - if (bodyBuffer != null) { + if (writableBuffer != null) { // The body was rebuilt on the client, so we need to behave as a regular message on this case super.saveToOutputStream(out); } else { @@ -118,7 +118,7 @@ public final class ClientLargeMessageImpl extends ClientMessageImpl implements C @Override public ClientLargeMessageImpl setOutputStream(final OutputStream out) throws ActiveMQException { - if (bodyBuffer != null) { + if (writableBuffer != null) { super.setOutputStream(out); } else { largeMessageController.setOutputStream(out); @@ -129,7 +129,7 @@ public final class ClientLargeMessageImpl extends ClientMessageImpl implements C @Override public boolean waitOutputStreamCompletion(final long timeMilliseconds) throws ActiveMQException { - if (bodyBuffer != null) { + if (writableBuffer != null) { return super.waitOutputStreamCompletion(timeMilliseconds); } else { return largeMessageController.waitCompletion(timeMilliseconds); @@ -138,7 +138,7 @@ public final class ClientLargeMessageImpl extends ClientMessageImpl implements C @Override public void discardBody() { - if (bodyBuffer != null) { + if (writableBuffer != null) { super.discardBody(); } else { largeMessageController.discardUnusedPackets(); @@ -146,17 +146,17 @@ public final class ClientLargeMessageImpl extends ClientMessageImpl implements C } private void checkBuffer() throws ActiveMQException { - if (bodyBuffer == null) { + if (writableBuffer == null) { long bodySize = this.largeMessageSize + BODY_OFFSET; if (bodySize > Integer.MAX_VALUE) { bodySize = Integer.MAX_VALUE; } - createBody((int) bodySize); + initBuffer((int) bodySize); - bodyBuffer = new ResetLimitWrappedActiveMQBuffer(BODY_OFFSET, buffer, this); + writableBuffer = new ResetLimitWrappedActiveMQBuffer(BODY_OFFSET, buffer.duplicate(), this); - largeMessageController.saveBuffer(new ActiveMQOutputStream(bodyBuffer)); + largeMessageController.saveBuffer(new ActiveMQOutputStream(writableBuffer)); } } @@ -178,7 +178,7 @@ public final class ClientLargeMessageImpl extends ClientMessageImpl implements C public void retrieveExistingData(ClientMessageInternal clMessage) { this.messageID = clMessage.getMessageID(); - this.address = clMessage.getAddress(); + this.address = clMessage.getAddressSimpleString(); this.setUserID(clMessage.getUserID()); this.setFlowControlSize(clMessage.getFlowControlSize()); this.setDeliveryCount(clMessage.getDeliveryCount()); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientMessageImpl.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientMessageImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientMessageImpl.java index 7bf8eb7..252ae86 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientMessageImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientMessageImpl.java @@ -28,14 +28,16 @@ import org.apache.activemq.artemis.api.core.ActiveMQPropertyConversionException; import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.client.ActiveMQClientMessageBundle; -import org.apache.activemq.artemis.core.message.BodyEncoder; -import org.apache.activemq.artemis.core.message.impl.MessageImpl; +import org.apache.activemq.artemis.core.message.LargeBodyEncoder; +import org.apache.activemq.artemis.core.message.impl.CoreMessage; import org.apache.activemq.artemis.reader.MessageUtil; +import org.apache.activemq.artemis.utils.TypedProperties; +import org.apache.activemq.artemis.utils.UUID; /** * A ClientMessageImpl */ -public class ClientMessageImpl extends MessageImpl implements ClientMessageInternal { +public class ClientMessageImpl extends CoreMessage implements ClientMessageInternal { // added this constant here so that the client package have no dependency on JMS public static final SimpleString REPLYTO_HEADER_NAME = MessageUtil.REPLYTO_HEADER_NAME; @@ -57,6 +59,35 @@ public class ClientMessageImpl extends MessageImpl implements ClientMessageInter public ClientMessageImpl() { } + protected ClientMessageImpl(ClientMessageImpl other) { + super(other); + } + + @Override + public ClientMessageImpl setDurable(boolean durable) { + super.setDurable(durable); + return this; + } + + @Override + public ClientMessageImpl setExpiration(long expiration) { + super.setExpiration(expiration); + return this; + } + + @Override + public ClientMessageImpl setPriority(byte priority) { + super.setPriority(priority); + return this; + } + + @Override + public ClientMessageImpl setUserID(UUID userID) { + + return this; + } + + /* * Construct messages before sending */ @@ -66,12 +97,13 @@ public class ClientMessageImpl extends MessageImpl implements ClientMessageInter final long timestamp, final byte priority, final int initialMessageBufferSize) { - super(type, durable, expiration, timestamp, priority, initialMessageBufferSize); + this.setType(type).setExpiration(expiration).setTimestamp(timestamp).setDurable(durable). + setPriority(priority).initBuffer(initialMessageBufferSize); } @Override - public boolean isServerMessage() { - return false; + public TypedProperties getProperties() { + return this.checkProperties(); } @Override @@ -108,6 +140,11 @@ public class ClientMessageImpl extends MessageImpl implements ClientMessageInter return this; } + + @Override + public void checkCompletion() throws ActiveMQException { + } + @Override public int getFlowControlSize() { if (flowControlSize < 0) { @@ -141,7 +178,7 @@ public class ClientMessageImpl extends MessageImpl implements ClientMessageInter @Override public String toString() { - return getClass().getSimpleName() + "[messageID=" + messageID + ", durable=" + durable + ", address=" + getAddress() + ",userID=" + (getUserID() != null ? getUserID() : "null") + ",properties=" + properties.toString() + "]"; + return getClass().getSimpleName() + "[messageID=" + messageID + ", durable=" + durable + ", address=" + getAddress() + ",userID=" + (getUserID() != null ? getUserID() : "null") + ",properties=" + getProperties().toString() + "]"; } @Override @@ -189,7 +226,7 @@ public class ClientMessageImpl extends MessageImpl implements ClientMessageInter } @Override - public BodyEncoder getBodyEncoder() throws ActiveMQException { + public LargeBodyEncoder getBodyEncoder() throws ActiveMQException { return new DecodingContext(); } @@ -307,15 +344,17 @@ public class ClientMessageImpl extends MessageImpl implements ClientMessageInter @Override public ClientMessageImpl writeBodyBufferBytes(byte[] bytes) { - return (ClientMessageImpl) super.writeBodyBufferBytes(bytes); + getBodyBuffer().writeBytes(bytes); + return this; } @Override public ClientMessageImpl writeBodyBufferString(String string) { - return (ClientMessageImpl) super.writeBodyBufferString(string); + getBodyBuffer().writeString(string); + return this; } - private final class DecodingContext implements BodyEncoder { + private final class DecodingContext implements LargeBodyEncoder { private DecodingContext() { } @@ -347,9 +386,15 @@ public class ClientMessageImpl extends MessageImpl implements ClientMessageInter @Override public int encode(final ActiveMQBuffer bufferOut, final int size) { byte[] bytes = new byte[size]; - getWholeBuffer().readBytes(bytes); + buffer.readBytes(bytes); bufferOut.writeBytes(bytes, 0, size); return size; } } + + @Override + public Message copy() { + return new ClientMessageImpl(this); + } + } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientMessageInternal.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientMessageInternal.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientMessageInternal.java index 07d4719..4b87878 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientMessageInternal.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientMessageInternal.java @@ -16,7 +16,6 @@ */ package org.apache.activemq.artemis.core.client.impl; -import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.client.ClientMessage; import org.apache.activemq.artemis.utils.TypedProperties; @@ -34,8 +33,6 @@ public interface ClientMessageInternal extends ClientMessage { */ void setFlowControlSize(int flowControlSize); - void setAddressTransient(SimpleString address); - void onReceipt(ClientConsumerInternal consumer); /** @@ -44,4 +41,5 @@ public interface ClientMessageInternal extends ClientMessage { void discardBody(); boolean isCompressed(); + } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerImpl.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerImpl.java index 1dfbe72..ce4a8a1 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerImpl.java @@ -23,12 +23,12 @@ import java.util.concurrent.atomic.AtomicLong; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQBuffers; import org.apache.activemq.artemis.api.core.ActiveMQException; +import org.apache.activemq.artemis.api.core.ICoreMessage; import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.client.SendAcknowledgementHandler; import org.apache.activemq.artemis.core.client.ActiveMQClientMessageBundle; -import org.apache.activemq.artemis.core.message.BodyEncoder; -import org.apache.activemq.artemis.core.message.impl.MessageInternal; +import org.apache.activemq.artemis.core.message.LargeBodyEncoder; import org.apache.activemq.artemis.spi.core.remoting.SessionContext; import org.apache.activemq.artemis.utils.ActiveMQBufferInputStream; import org.apache.activemq.artemis.utils.DeflaterReader; @@ -208,7 +208,7 @@ public class ClientProducerImpl implements ClientProducerInternal { } private void doSend(SimpleString sendingAddress, - final Message msg, + final Message msgToSend, final SendAcknowledgementHandler handler, final boolean forceAsync) throws ActiveMQException { if (sendingAddress == null) { @@ -217,7 +217,8 @@ public class ClientProducerImpl implements ClientProducerInternal { session.startCall(); try { - MessageInternal msgI = (MessageInternal) msg; + // In case we received message from another protocol, we first need to convert it to core as the ClientProducer only understands core + ICoreMessage msg = msgToSend.toCore(); ClientProducerCredits theCredits; @@ -225,8 +226,8 @@ public class ClientProducerImpl implements ClientProducerInternal { // a note about the second check on the writerIndexSize, // If it's a server's message, it means this is being done through the bridge or some special consumer on the // server's on which case we can't' convert the message into large at the servers - if (sessionContext.supportsLargeMessage() && (msgI.getBodyInputStream() != null || msgI.isLargeMessage() || - msgI.getBodyBuffer().writerIndex() > minLargeMessageSize && !msgI.isServerMessage())) { + if (sessionContext.supportsLargeMessage() && (getBodyInputStream(msg) != null || msg.isLargeMessage() || + msg.getBodyBuffer().writerIndex() > minLargeMessageSize)) { isLarge = true; } else { isLarge = false; @@ -248,27 +249,31 @@ public class ClientProducerImpl implements ClientProducerInternal { } if (groupID != null) { - msgI.putStringProperty(Message.HDR_GROUP_ID, groupID); + msg.putStringProperty(Message.HDR_GROUP_ID, groupID); } - final boolean sendBlockingConfig = msgI.isDurable() ? blockOnDurableSend : blockOnNonDurableSend; + final boolean sendBlockingConfig = msg.isDurable() ? blockOnDurableSend : blockOnNonDurableSend; final boolean forceAsyncOverride = handler != null; final boolean sendBlocking = sendBlockingConfig && !forceAsyncOverride; session.workDone(); if (isLarge) { - largeMessageSend(sendBlocking, msgI, theCredits, handler); + largeMessageSend(sendBlocking, msg, theCredits, handler); } else { - sendRegularMessage(sendingAddress, msgI, sendBlocking, theCredits, handler); + sendRegularMessage(sendingAddress, msg, sendBlocking, theCredits, handler); } } finally { session.endCall(); } } + private InputStream getBodyInputStream(ICoreMessage msgI) { + return msgI.getBodyInputStream(); + } + private void sendRegularMessage(final SimpleString sendingAddress, - final MessageInternal msgI, + final ICoreMessage msgI, final boolean sendBlocking, final ClientProducerCredits theCredits, final SendAcknowledgementHandler handler) throws ActiveMQException { @@ -301,7 +306,7 @@ public class ClientProducerImpl implements ClientProducerInternal { * @throws ActiveMQException */ private void largeMessageSend(final boolean sendBlocking, - final MessageInternal msgI, + final ICoreMessage msgI, final ClientProducerCredits credits, SendAcknowledgementHandler handler) throws ActiveMQException { logger.tracef("largeMessageSend::%s, Blocking=%s", msgI, sendBlocking); @@ -313,22 +318,22 @@ public class ClientProducerImpl implements ClientProducerInternal { } // msg.getBody() could be Null on LargeServerMessage - if (msgI.getBodyInputStream() == null && msgI.getWholeBuffer() != null) { - msgI.getWholeBuffer().readerIndex(0); + if (getBodyInputStream(msgI) == null && msgI.getBuffer() != null) { + msgI.getBuffer().readerIndex(0); } InputStream input; if (msgI.isServerMessage()) { largeMessageSendServer(sendBlocking, msgI, credits, handler); - } else if ((input = msgI.getBodyInputStream()) != null) { + } else if ((input = getBodyInputStream(msgI)) != null) { largeMessageSendStreamed(sendBlocking, msgI, input, credits, handler); } else { largeMessageSendBuffered(sendBlocking, msgI, credits, handler); } } - private void sendInitialLargeMessageHeader(MessageInternal msgI, + private void sendInitialLargeMessageHeader(Message msgI, ClientProducerCredits credits) throws ActiveMQException { int creditsUsed = sessionContext.sendInitialChunkOnLargeMessage(msgI); @@ -348,17 +353,14 @@ public class ClientProducerImpl implements ClientProducerInternal { * @throws ActiveMQException */ private void largeMessageSendServer(final boolean sendBlocking, - final MessageInternal msgI, + final ICoreMessage msgI, final ClientProducerCredits credits, SendAcknowledgementHandler handler) throws ActiveMQException { sendInitialLargeMessageHeader(msgI, credits); - BodyEncoder context = msgI.getBodyEncoder(); + LargeBodyEncoder context = msgI.getBodyEncoder(); final long bodySize = context.getLargeBodySize(); - - final int reconnectID = sessionContext.getReconnectID(); - context.open(); try { @@ -392,7 +394,7 @@ public class ClientProducerImpl implements ClientProducerInternal { * @throws ActiveMQException */ private void largeMessageSendBuffered(final boolean sendBlocking, - final MessageInternal msgI, + final ICoreMessage msgI, final ClientProducerCredits credits, SendAcknowledgementHandler handler) throws ActiveMQException { msgI.getBodyBuffer().readerIndex(0); @@ -407,7 +409,7 @@ public class ClientProducerImpl implements ClientProducerInternal { * @throws ActiveMQException */ private void largeMessageSendStreamed(final boolean sendBlocking, - final MessageInternal msgI, + final ICoreMessage msgI, final InputStream inputStreamParameter, final ClientProducerCredits credits, SendAcknowledgementHandler handler) throws ActiveMQException { @@ -478,7 +480,7 @@ public class ClientProducerImpl implements ClientProducerInternal { msgI.putLongProperty(Message.HDR_LARGE_BODY_SIZE, deflaterReader.getTotalSize()); msgI.getBodyBuffer().writeBytes(buff, 0, pos); - sendRegularMessage(msgI.getAddress(), msgI, sendBlocking, credits, handler); + sendRegularMessage(msgI.getAddressSimpleString(), msgI, sendBlocking, credits, handler); return; } else { if (!headerSent) { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/CompressedLargeMessageControllerImpl.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/CompressedLargeMessageControllerImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/CompressedLargeMessageControllerImpl.java index 55f9129..ce652d2 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/CompressedLargeMessageControllerImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/CompressedLargeMessageControllerImpl.java @@ -513,6 +513,12 @@ final class CompressedLargeMessageControllerImpl implements LargeMessageControll } @Override + public void writeBytes(ByteBuf src, int srcIndex, int length) { + throw new IllegalAccessError(OPERATION_NOT_SUPPORTED); + } + + + @Override public ByteBuffer toByteBuffer() { throw new IllegalAccessError(OPERATION_NOT_SUPPORTED); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/LargeMessageControllerImpl.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/LargeMessageControllerImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/LargeMessageControllerImpl.java index 951aea2..0bb5690 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/LargeMessageControllerImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/LargeMessageControllerImpl.java @@ -863,6 +863,21 @@ public class LargeMessageControllerImpl implements LargeMessageController { throw new IllegalAccessError(LargeMessageControllerImpl.READ_ONLY_ERROR_MESSAGE); } + /** + * Transfers the specified source buffer's data to this buffer starting at + * the current {@code writerIndex} until the source buffer's position + * reaches its limit, and increases the {@code writerIndex} by the + * number of the transferred bytes. + * + * @param src The source buffer + * @throws IndexOutOfBoundsException if {@code src.remaining()} is greater than + * {@code this.writableBytes} + */ + @Override + public void writeBytes(ByteBuf src, int srcIndex, int length) { + throw new IllegalAccessError(LargeMessageControllerImpl.READ_ONLY_ERROR_MESSAGE); + } + public int writeBytes(final InputStream in, final int length) throws IOException { throw new IllegalAccessError(LargeMessageControllerImpl.READ_ONLY_ERROR_MESSAGE); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/BodyEncoder.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/BodyEncoder.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/BodyEncoder.java deleted file mode 100644 index baafaac..0000000 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/BodyEncoder.java +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.artemis.core.message; - -import java.nio.ByteBuffer; - -import org.apache.activemq.artemis.api.core.ActiveMQBuffer; -import org.apache.activemq.artemis.api.core.ActiveMQException; - -/** - * Class used to encode message body into buffers. - *
- * Used to send large streams over the wire - */ -public interface BodyEncoder { - - /** - * This method must not be called directly by ActiveMQ Artemis clients. - */ - void open() throws ActiveMQException; - - /** - * This method must not be called directly by ActiveMQ Artemis clients. - */ - void close() throws ActiveMQException; - - /** - * This method must not be called directly by ActiveMQ Artemis clients. - */ - int encode(ByteBuffer bufferRead) throws ActiveMQException; - - /** - * This method must not be called directly by ActiveMQ Artemis clients. - */ - int encode(ActiveMQBuffer bufferOut, int size) throws ActiveMQException; - - /** - * This method must not be called directly by ActiveMQ Artemis clients. - */ - long getLargeBodySize(); -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/LargeBodyEncoder.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/LargeBodyEncoder.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/LargeBodyEncoder.java new file mode 100644 index 0000000..8b96282 --- /dev/null +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/LargeBodyEncoder.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.core.message; + +import java.nio.ByteBuffer; + +import org.apache.activemq.artemis.api.core.ActiveMQBuffer; +import org.apache.activemq.artemis.api.core.ActiveMQException; + +/** + * Class used to encode message body into buffers. + *
+ * Used to send large streams over the wire + */ +public interface LargeBodyEncoder { + + /** + * This method must not be called directly by ActiveMQ Artemis clients. + */ + void open() throws ActiveMQException; + + /** + * This method must not be called directly by ActiveMQ Artemis clients. + */ + void close() throws ActiveMQException; + + /** + * This method must not be called directly by ActiveMQ Artemis clients. + */ + int encode(ByteBuffer bufferRead) throws ActiveMQException; + + /** + * This method must not be called directly by ActiveMQ Artemis clients. + */ + int encode(ActiveMQBuffer bufferOut, int size) throws ActiveMQException; + + /** + * This method must not be called directly by ActiveMQ Artemis clients. + */ + long getLargeBodySize(); +}