Return-Path: X-Original-To: apmail-activemq-commits-archive@www.apache.org Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id B953318DE5 for ; Mon, 10 Aug 2015 15:13:03 +0000 (UTC) Received: (qmail 42567 invoked by uid 500); 10 Aug 2015 15:13:00 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 42515 invoked by uid 500); 10 Aug 2015 15:13:00 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 42485 invoked by uid 99); 10 Aug 2015 15:13:00 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 10 Aug 2015 15:13:00 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 5A257E0509; Mon, 10 Aug 2015 15:13:00 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: clebertsuconic@apache.org To: commits@activemq.apache.org Date: Mon, 10 Aug 2015 15:13:01 -0000 Message-Id: <747cff9d9d2f4bc0afade0e02ac874bf@git.apache.org> In-Reply-To: <6d75255d38c94b2f8601b49eb76c9d59@git.apache.org> References: <6d75255d38c94b2f8601b49eb76c9d59@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [02/53] [abbrv] [partial] activemq-artemis git commit: automatic checkstyle change http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bac96047/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/jms/ServerJMSMessage.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/jms/ServerJMSMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/jms/ServerJMSMessage.java index 81699e8..315dd12 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/jms/ServerJMSMessage.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/jms/ServerJMSMessage.java @@ -30,256 +30,207 @@ import org.apache.activemq.artemis.jms.client.ActiveMQDestination; import org.apache.activemq.artemis.jms.client.ActiveMQQueue; import org.apache.activemq.artemis.reader.MessageUtil; -public class ServerJMSMessage implements Message -{ +public class ServerJMSMessage implements Message { + protected final MessageInternal message; protected int deliveryCount; - public MessageInternal getInnerMessage() - { + public MessageInternal getInnerMessage() { return message; } - - public ServerJMSMessage(MessageInternal message, int deliveryCount) - { + public ServerJMSMessage(MessageInternal message, int deliveryCount) { this.message = message; this.deliveryCount = deliveryCount; } - @Override - public final String getJMSMessageID() throws JMSException - { + public final String getJMSMessageID() throws JMSException { return null; } @Override - public final void setJMSMessageID(String id) throws JMSException - { + public final void setJMSMessageID(String id) throws JMSException { } @Override - public final long getJMSTimestamp() throws JMSException - { + public final long getJMSTimestamp() throws JMSException { return message.getTimestamp(); } @Override - public final void setJMSTimestamp(long timestamp) throws JMSException - { + public final void setJMSTimestamp(long timestamp) throws JMSException { message.setTimestamp(timestamp); } - @Override - public final byte[] getJMSCorrelationIDAsBytes() throws JMSException - { + public final byte[] getJMSCorrelationIDAsBytes() throws JMSException { return MessageUtil.getJMSCorrelationIDAsBytes(message); } @Override - public final void setJMSCorrelationIDAsBytes(byte[] correlationID) throws JMSException - { - try - { + public final void setJMSCorrelationIDAsBytes(byte[] correlationID) throws JMSException { + try { MessageUtil.setJMSCorrelationIDAsBytes(message, correlationID); } - catch (ActiveMQException e) - { + catch (ActiveMQException e) { throw new JMSException(e.getMessage()); } } @Override - public final void setJMSCorrelationID(String correlationID) throws JMSException - { + public final void setJMSCorrelationID(String correlationID) throws JMSException { MessageUtil.setJMSCorrelationID(message, correlationID); } @Override - public final String getJMSCorrelationID() throws JMSException - { + public final String getJMSCorrelationID() throws JMSException { return MessageUtil.getJMSCorrelationID(message); } @Override - public final Destination getJMSReplyTo() throws JMSException - { + public final Destination getJMSReplyTo() throws JMSException { SimpleString reply = MessageUtil.getJMSReplyTo(message); - if (reply != null) - { + if (reply != null) { return ActiveMQDestination.fromAddress(reply.toString()); } - else - { + else { return null; } } @Override - public final void setJMSReplyTo(Destination replyTo) throws JMSException - { + public final void setJMSReplyTo(Destination replyTo) throws JMSException { MessageUtil.setJMSReplyTo(message, replyTo == null ? null : ((ActiveMQDestination) replyTo).getSimpleAddress()); } - public final Destination getJMSDestination() throws JMSException - { + public final Destination getJMSDestination() throws JMSException { SimpleString sdest = message.getAddress(); - if (sdest == null) - { + if (sdest == null) { return null; } - else - { - if (!sdest.toString().startsWith("jms.")) - { + else { + if (!sdest.toString().startsWith("jms.")) { return new ActiveMQQueue(sdest.toString(), sdest.toString()); } - else - { + else { return ActiveMQDestination.fromAddress(sdest.toString()); } } } @Override - public final void setJMSDestination(Destination destination) throws JMSException - { - if (destination == null) - { + public final void setJMSDestination(Destination destination) throws JMSException { + if (destination == null) { message.setAddress(null); } - else - { + else { message.setAddress(((ActiveMQDestination) destination).getSimpleAddress()); } } @Override - public final int getJMSDeliveryMode() throws JMSException - { + public final int getJMSDeliveryMode() throws JMSException { return message.isDurable() ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT; } @Override - public final void setJMSDeliveryMode(int deliveryMode) throws JMSException - { - if (deliveryMode == DeliveryMode.PERSISTENT) - { + public final void setJMSDeliveryMode(int deliveryMode) throws JMSException { + if (deliveryMode == DeliveryMode.PERSISTENT) { message.setDurable(true); } - else if (deliveryMode == DeliveryMode.NON_PERSISTENT) - { + else if (deliveryMode == DeliveryMode.NON_PERSISTENT) { message.setDurable(false); } - else - { + else { throw new JMSException("Invalid mode " + deliveryMode); } } @Override - public final boolean getJMSRedelivered() throws JMSException - { + public final boolean getJMSRedelivered() throws JMSException { return false; } @Override - public final void setJMSRedelivered(boolean redelivered) throws JMSException - { + public final void setJMSRedelivered(boolean redelivered) throws JMSException { // no op } @Override - public final String getJMSType() throws JMSException - { + public final String getJMSType() throws JMSException { return MessageUtil.getJMSType(message); } @Override - public final void setJMSType(String type) throws JMSException - { + public final void setJMSType(String type) throws JMSException { MessageUtil.setJMSType(message, type); } @Override - public final long getJMSExpiration() throws JMSException - { + public final long getJMSExpiration() throws JMSException { return message.getExpiration(); } @Override - public final void setJMSExpiration(long expiration) throws JMSException - { + public final void setJMSExpiration(long expiration) throws JMSException { message.setExpiration(expiration); } @Override - public final long getJMSDeliveryTime() throws JMSException - { + public final long getJMSDeliveryTime() throws JMSException { // no op return 0; } @Override - public final void setJMSDeliveryTime(long deliveryTime) throws JMSException - { + public final void setJMSDeliveryTime(long deliveryTime) throws JMSException { // no op } @Override - public final int getJMSPriority() throws JMSException - { + public final int getJMSPriority() throws JMSException { return message.getPriority(); } @Override - public final void setJMSPriority(int priority) throws JMSException - { + public final void setJMSPriority(int priority) throws JMSException { message.setPriority((byte) priority); } @Override - public final void clearProperties() throws JMSException - { + public final void clearProperties() throws JMSException { MessageUtil.clearProperties(message); } @Override - public final boolean propertyExists(String name) throws JMSException - { + public final boolean propertyExists(String name) throws JMSException { return MessageUtil.propertyExists(message, name); } @Override - public final boolean getBooleanProperty(String name) throws JMSException - { + public final boolean getBooleanProperty(String name) throws JMSException { return message.getBooleanProperty(name); } @Override - public final byte getByteProperty(String name) throws JMSException - { + public final byte getByteProperty(String name) throws JMSException { return message.getByteProperty(name); } @Override - public final short getShortProperty(String name) throws JMSException - { + public final short getShortProperty(String name) throws JMSException { return message.getShortProperty(name); } @Override - public final int getIntProperty(String name) throws JMSException - { - if (MessageUtil.JMSXDELIVERYCOUNT.equals(name)) - { + public final int getIntProperty(String name) throws JMSException { + if (MessageUtil.JMSXDELIVERYCOUNT.equals(name)) { return deliveryCount; } @@ -287,10 +238,8 @@ public class ServerJMSMessage implements Message } @Override - public final long getLongProperty(String name) throws JMSException - { - if (MessageUtil.JMSXDELIVERYCOUNT.equals(name)) - { + public final long getLongProperty(String name) throws JMSException { + if (MessageUtil.JMSXDELIVERYCOUNT.equals(name)) { return deliveryCount; } @@ -298,115 +247,95 @@ public class ServerJMSMessage implements Message } @Override - public final float getFloatProperty(String name) throws JMSException - { + public final float getFloatProperty(String name) throws JMSException { return message.getFloatProperty(name); } @Override - public final double getDoubleProperty(String name) throws JMSException - { + public final double getDoubleProperty(String name) throws JMSException { return message.getDoubleProperty(name); } @Override - public final String getStringProperty(String name) throws JMSException - { - if (MessageUtil.JMSXDELIVERYCOUNT.equals(name)) - { + public final String getStringProperty(String name) throws JMSException { + if (MessageUtil.JMSXDELIVERYCOUNT.equals(name)) { return String.valueOf(deliveryCount); } - return message.getStringProperty(name); } @Override - public final Object getObjectProperty(String name) throws JMSException - { + public final Object getObjectProperty(String name) throws JMSException { Object val = message.getObjectProperty(name); - if (val instanceof SimpleString) - { - val = ((SimpleString)val).toString(); + if (val instanceof SimpleString) { + val = ((SimpleString) val).toString(); } return val; } @Override - public final Enumeration getPropertyNames() throws JMSException - { + public final Enumeration getPropertyNames() throws JMSException { return Collections.enumeration(MessageUtil.getPropertyNames(message)); } @Override - public final void setBooleanProperty(String name, boolean value) throws JMSException - { + public final void setBooleanProperty(String name, boolean value) throws JMSException { message.putBooleanProperty(name, value); } @Override - public final void setByteProperty(String name, byte value) throws JMSException - { + public final void setByteProperty(String name, byte value) throws JMSException { message.putByteProperty(name, value); } @Override - public final void setShortProperty(String name, short value) throws JMSException - { + public final void setShortProperty(String name, short value) throws JMSException { message.putShortProperty(name, value); } @Override - public final void setIntProperty(String name, int value) throws JMSException - { + public final void setIntProperty(String name, int value) throws JMSException { message.putIntProperty(name, value); } @Override - public final void setLongProperty(String name, long value) throws JMSException - { + public final void setLongProperty(String name, long value) throws JMSException { message.putLongProperty(name, value); } @Override - public final void setFloatProperty(String name, float value) throws JMSException - { + public final void setFloatProperty(String name, float value) throws JMSException { message.putFloatProperty(name, value); } @Override - public final void setDoubleProperty(String name, double value) throws JMSException - { + public final void setDoubleProperty(String name, double value) throws JMSException { message.putDoubleProperty(name, value); } @Override - public final void setStringProperty(String name, String value) throws JMSException - { + public final void setStringProperty(String name, String value) throws JMSException { message.putStringProperty(name, value); } @Override - public final void setObjectProperty(String name, Object value) throws JMSException - { + public final void setObjectProperty(String name, Object value) throws JMSException { message.putObjectProperty(name, value); } @Override - public final void acknowledge() throws JMSException - { + public final void acknowledge() throws JMSException { // no op } @Override - public void clearBody() throws JMSException - { + public void clearBody() throws JMSException { message.getBodyBuffer().clear(); } @Override - public final T getBody(Class c) throws JMSException - { + public final T getBody(Class c) throws JMSException { // no op.. jms2 not used on the conversion return null; } @@ -414,20 +343,16 @@ public class ServerJMSMessage implements Message /** * Encode the body into the internal message */ - public void encode() throws Exception - { + public void encode() throws Exception { message.getBodyBuffer().resetReaderIndex(); } - - public void decode() throws Exception - { + public void decode() throws Exception { message.getBodyBuffer().resetReaderIndex(); } @Override - public final boolean isBodyAssignableTo(Class c) throws JMSException - { + public final boolean isBodyAssignableTo(Class c) throws JMSException { // no op.. jms2 not used on the conversion return false; } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bac96047/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/jms/ServerJMSStreamMessage.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/jms/ServerJMSStreamMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/jms/ServerJMSStreamMessage.java index 6b4e0c8..1afc8eb 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/jms/ServerJMSStreamMessage.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/jms/ServerJMSStreamMessage.java @@ -40,168 +40,130 @@ import static org.apache.activemq.artemis.reader.StreamMessageUtil.streamReadObj import static org.apache.activemq.artemis.reader.StreamMessageUtil.streamReadShort; import static org.apache.activemq.artemis.reader.StreamMessageUtil.streamReadString; -public final class ServerJMSStreamMessage extends ServerJMSMessage implements StreamMessage -{ +public final class ServerJMSStreamMessage extends ServerJMSMessage implements StreamMessage { + public static final byte TYPE = Message.STREAM_TYPE; private int bodyLength = 0; - - public ServerJMSStreamMessage(MessageInternal message, int deliveryCount) - { + public ServerJMSStreamMessage(MessageInternal message, int deliveryCount) { super(message, deliveryCount); } // StreamMessage implementation ---------------------------------- - public boolean readBoolean() throws JMSException - { - try - { + public boolean readBoolean() throws JMSException { + try { return streamReadBoolean(message); } - catch (IllegalStateException e) - { + catch (IllegalStateException e) { throw new MessageFormatException(e.getMessage()); } - catch (IndexOutOfBoundsException e) - { + catch (IndexOutOfBoundsException e) { throw new MessageEOFException(""); } } - public byte readByte() throws JMSException - { - try - { + public byte readByte() throws JMSException { + try { return streamReadByte(message); } - catch (IllegalStateException e) - { + catch (IllegalStateException e) { throw new MessageFormatException(e.getMessage()); } - catch (IndexOutOfBoundsException e) - { + catch (IndexOutOfBoundsException e) { throw new MessageEOFException(""); } } - public short readShort() throws JMSException - { + public short readShort() throws JMSException { - try - { + try { return streamReadShort(message); } - catch (IllegalStateException e) - { + catch (IllegalStateException e) { throw new MessageFormatException(e.getMessage()); } - catch (IndexOutOfBoundsException e) - { + catch (IndexOutOfBoundsException e) { throw new MessageEOFException(""); } } - public char readChar() throws JMSException - { + public char readChar() throws JMSException { - try - { + try { return streamReadChar(message); } - catch (IllegalStateException e) - { + catch (IllegalStateException e) { throw new MessageFormatException(e.getMessage()); } - catch (IndexOutOfBoundsException e) - { + catch (IndexOutOfBoundsException e) { throw new MessageEOFException(""); } } - public int readInt() throws JMSException - { + public int readInt() throws JMSException { - try - { + try { return streamReadInteger(message); } - catch (IllegalStateException e) - { + catch (IllegalStateException e) { throw new MessageFormatException(e.getMessage()); } - catch (IndexOutOfBoundsException e) - { + catch (IndexOutOfBoundsException e) { throw new MessageEOFException(""); } } - public long readLong() throws JMSException - { + public long readLong() throws JMSException { - try - { + try { return streamReadLong(message); } - catch (IllegalStateException e) - { + catch (IllegalStateException e) { throw new MessageFormatException(e.getMessage()); } - catch (IndexOutOfBoundsException e) - { + catch (IndexOutOfBoundsException e) { throw new MessageEOFException(""); } } - public float readFloat() throws JMSException - { + public float readFloat() throws JMSException { - try - { + try { return streamReadFloat(message); } - catch (IllegalStateException e) - { + catch (IllegalStateException e) { throw new MessageFormatException(e.getMessage()); } - catch (IndexOutOfBoundsException e) - { + catch (IndexOutOfBoundsException e) { throw new MessageEOFException(""); } } - public double readDouble() throws JMSException - { + public double readDouble() throws JMSException { - try - { + try { return streamReadDouble(message); } - catch (IllegalStateException e) - { + catch (IllegalStateException e) { throw new MessageFormatException(e.getMessage()); } - catch (IndexOutOfBoundsException e) - { + catch (IndexOutOfBoundsException e) { throw new MessageEOFException(""); } } - public String readString() throws JMSException - { + public String readString() throws JMSException { - try - { + try { return streamReadString(message); } - catch (IllegalStateException e) - { + catch (IllegalStateException e) { throw new MessageFormatException(e.getMessage()); } - catch (IndexOutOfBoundsException e) - { + catch (IndexOutOfBoundsException e) { throw new MessageEOFException(""); } } @@ -211,209 +173,170 @@ public final class ServerJMSStreamMessage extends ServerJMSMessage implements St */ private int len = 0; - public int readBytes(final byte[] value) throws JMSException - { + public int readBytes(final byte[] value) throws JMSException { - try - { + try { Pair pairRead = streamReadBytes(message, len, value); len = pairRead.getA(); return pairRead.getB(); } - catch (IllegalStateException e) - { + catch (IllegalStateException e) { throw new MessageFormatException(e.getMessage()); } - catch (IndexOutOfBoundsException e) - { + catch (IndexOutOfBoundsException e) { throw new MessageEOFException(""); } } - public Object readObject() throws JMSException - { + public Object readObject() throws JMSException { - if (getBodyBuffer(message).readerIndex() >= message.getEndOfBodyPosition()) - { + if (getBodyBuffer(message).readerIndex() >= message.getEndOfBodyPosition()) { throw new MessageEOFException(""); } - try - { + try { return streamReadObject(message); } - catch (IllegalStateException e) - { + catch (IllegalStateException e) { throw new MessageFormatException(e.getMessage()); } - catch (IndexOutOfBoundsException e) - { + catch (IndexOutOfBoundsException e) { throw new MessageEOFException(""); } } - public void writeBoolean(final boolean value) throws JMSException - { + public void writeBoolean(final boolean value) throws JMSException { getBuffer().writeByte(DataConstants.BOOLEAN); getBuffer().writeBoolean(value); } - public void writeByte(final byte value) throws JMSException - { + public void writeByte(final byte value) throws JMSException { getBuffer().writeByte(DataConstants.BYTE); getBuffer().writeByte(value); } - public void writeShort(final short value) throws JMSException - { + public void writeShort(final short value) throws JMSException { getBuffer().writeByte(DataConstants.SHORT); getBuffer().writeShort(value); } - public void writeChar(final char value) throws JMSException - { + public void writeChar(final char value) throws JMSException { getBuffer().writeByte(DataConstants.CHAR); getBuffer().writeShort((short) value); } - public void writeInt(final int value) throws JMSException - { + public void writeInt(final int value) throws JMSException { getBuffer().writeByte(DataConstants.INT); getBuffer().writeInt(value); } - public void writeLong(final long value) throws JMSException - { + public void writeLong(final long value) throws JMSException { getBuffer().writeByte(DataConstants.LONG); getBuffer().writeLong(value); } - public void writeFloat(final float value) throws JMSException - { + public void writeFloat(final float value) throws JMSException { getBuffer().writeByte(DataConstants.FLOAT); getBuffer().writeInt(Float.floatToIntBits(value)); } - public void writeDouble(final double value) throws JMSException - { + public void writeDouble(final double value) throws JMSException { getBuffer().writeByte(DataConstants.DOUBLE); getBuffer().writeLong(Double.doubleToLongBits(value)); } - public void writeString(final String value) throws JMSException - { + public void writeString(final String value) throws JMSException { getBuffer().writeByte(DataConstants.STRING); getBuffer().writeNullableString(value); } - public void writeBytes(final byte[] value) throws JMSException - { + public void writeBytes(final byte[] value) throws JMSException { getBuffer().writeByte(DataConstants.BYTES); getBuffer().writeInt(value.length); getBuffer().writeBytes(value); } - public void writeBytes(final byte[] value, final int offset, final int length) throws JMSException - { + public void writeBytes(final byte[] value, final int offset, final int length) throws JMSException { getBuffer().writeByte(DataConstants.BYTES); getBuffer().writeInt(length); getBuffer().writeBytes(value, offset, length); } - public void writeObject(final Object value) throws JMSException - { - if (value instanceof String) - { + public void writeObject(final Object value) throws JMSException { + if (value instanceof String) { writeString((String) value); } - else if (value instanceof Boolean) - { + else if (value instanceof Boolean) { writeBoolean((Boolean) value); } - else if (value instanceof Byte) - { + else if (value instanceof Byte) { writeByte((Byte) value); } - else if (value instanceof Short) - { + else if (value instanceof Short) { writeShort((Short) value); } - else if (value instanceof Integer) - { + else if (value instanceof Integer) { writeInt((Integer) value); } - else if (value instanceof Long) - { + else if (value instanceof Long) { writeLong((Long) value); } - else if (value instanceof Float) - { + else if (value instanceof Float) { writeFloat((Float) value); } - else if (value instanceof Double) - { + else if (value instanceof Double) { writeDouble((Double) value); } - else if (value instanceof byte[]) - { + else if (value instanceof byte[]) { writeBytes((byte[]) value); } - else if (value instanceof Character) - { + else if (value instanceof Character) { writeChar((Character) value); } - else if (value == null) - { + else if (value == null) { writeString(null); } - else - { + else { throw new MessageFormatException("Invalid object type: " + value.getClass()); } } - public void reset() throws JMSException - { + public void reset() throws JMSException { getBuffer().resetReaderIndex(); } // ActiveMQRAMessage overrides ---------------------------------------- @Override - public void clearBody() throws JMSException - { + public void clearBody() throws JMSException { super.clearBody(); getBuffer().clear(); } - private ActiveMQBuffer getBuffer() - { + private ActiveMQBuffer getBuffer() { return message.getBodyBuffer(); } - - public void decode() throws Exception - { + public void decode() throws Exception { super.decode(); } /** * Encode the body into the internal message */ - public void encode() throws Exception - { + public void encode() throws Exception { super.encode(); bodyLength = message.getEndOfBodyPosition(); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bac96047/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/jms/ServerJMSTextMessage.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/jms/ServerJMSTextMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/jms/ServerJMSTextMessage.java index 7ef7042..95e24b5 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/jms/ServerJMSTextMessage.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/jms/ServerJMSTextMessage.java @@ -26,14 +26,12 @@ import org.apache.activemq.artemis.core.message.impl.MessageInternal; import static org.apache.activemq.artemis.reader.TextMessageUtil.readBodyText; import static org.apache.activemq.artemis.reader.TextMessageUtil.writeBodyText; - /** * ActiveMQ Artemis implementation of a JMS TextMessage. *
* This class was ported from SpyTextMessage in JBossMQ. */ -public class ServerJMSTextMessage extends ServerJMSMessage implements TextMessage -{ +public class ServerJMSTextMessage extends ServerJMSMessage implements TextMessage { // Constants ----------------------------------------------------- public static final byte TYPE = Message.TEXT_TYPE; @@ -51,56 +49,45 @@ public class ServerJMSTextMessage extends ServerJMSMessage implements TextMessag /* * This constructor is used to construct messages prior to sending */ - public ServerJMSTextMessage(MessageInternal message, int deliveryCount) - { + public ServerJMSTextMessage(MessageInternal message, int deliveryCount) { super(message, deliveryCount); } // TextMessage implementation ------------------------------------ - public void setText(final String text) throws JMSException - { - if (text != null) - { + public void setText(final String text) throws JMSException { + if (text != null) { this.text = new SimpleString(text); } - else - { + else { this.text = null; } writeBodyText(message, this.text); } - public String getText() - { - if (text != null) - { + public String getText() { + if (text != null) { return text.toString(); } - else - { + else { return null; } } @Override - public void clearBody() throws JMSException - { + public void clearBody() throws JMSException { super.clearBody(); text = null; } - - public void encode() throws Exception - { + public void encode() throws Exception { super.encode(); writeBodyText(message, text); } - public void decode() throws Exception - { + public void decode() throws Exception { super.decode(); text = readBodyText(message); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bac96047/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ActiveMQProtonConnectionCallback.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ActiveMQProtonConnectionCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ActiveMQProtonConnectionCallback.java index 9b62b49..03c6474 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ActiveMQProtonConnectionCallback.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ActiveMQProtonConnectionCallback.java @@ -34,8 +34,8 @@ import org.proton.plug.AMQPSessionCallback; import org.proton.plug.ServerSASL; import org.proton.plug.sasl.AnonymousServerSASL; -public class ActiveMQProtonConnectionCallback implements AMQPConnectionCallback -{ +public class ActiveMQProtonConnectionCallback implements AMQPConnectionCallback { + private final ProtonProtocolManager manager; private final Connection connection; @@ -46,80 +46,64 @@ public class ActiveMQProtonConnectionCallback implements AMQPConnectionCallback private final ReusableLatch latch = new ReusableLatch(0); - public ActiveMQProtonConnectionCallback(ProtonProtocolManager manager, Connection connection) - { + public ActiveMQProtonConnectionCallback(ProtonProtocolManager manager, Connection connection) { this.manager = manager; this.connection = connection; } @Override - public ServerSASL[] getSASLMechnisms() - { + public ServerSASL[] getSASLMechnisms() { return new ServerSASL[]{new AnonymousServerSASL(), new ActiveMQPlainSASL(manager.getServer().getSecurityStore(), manager.getServer().getSecurityManager())}; } @Override - public void close() - { + public void close() { } - public Executor getExeuctor() - { - if (protonConnectionDelegate != null) - { + public Executor getExeuctor() { + if (protonConnectionDelegate != null) { return protonConnectionDelegate.getExecutor(); } - else - { + else { return null; } } @Override - public void setConnection(AMQPConnectionContext connection) - { + public void setConnection(AMQPConnectionContext connection) { this.amqpConnection = connection; } @Override - public AMQPConnectionContext getConnection() - { + public AMQPConnectionContext getConnection() { return amqpConnection; } - public ActiveMQProtonRemotingConnection getProtonConnectionDelegate() - { + public ActiveMQProtonRemotingConnection getProtonConnectionDelegate() { return protonConnectionDelegate; } - public void setProtonConnectionDelegate(ActiveMQProtonRemotingConnection protonConnectionDelegate) - { + public void setProtonConnectionDelegate(ActiveMQProtonRemotingConnection protonConnectionDelegate) { this.protonConnectionDelegate = protonConnectionDelegate; } - public void onTransport(ByteBuf byteBuf, AMQPConnectionContext amqpConnection) - { + public void onTransport(ByteBuf byteBuf, AMQPConnectionContext amqpConnection) { final int size = byteBuf.writerIndex(); latch.countUp(); - connection.write(new ChannelBufferWrapper(byteBuf, true), false, false, new ChannelFutureListener() - { + connection.write(new ChannelBufferWrapper(byteBuf, true), false, false, new ChannelFutureListener() { @Override - public void operationComplete(ChannelFuture future) throws Exception - { + public void operationComplete(ChannelFuture future) throws Exception { latch.countDown(); } }); - if (amqpConnection.isSyncOnFlush()) - { - try - { + if (amqpConnection.isSyncOnFlush()) { + try { latch.await(5, TimeUnit.SECONDS); } - catch (Exception e) - { + catch (Exception e) { e.printStackTrace(); } } @@ -127,10 +111,8 @@ public class ActiveMQProtonConnectionCallback implements AMQPConnectionCallback amqpConnection.outputDone(size); } - @Override - public AMQPSessionCallback createSessionCallback(AMQPConnectionContext connection) - { + public AMQPSessionCallback createSessionCallback(AMQPConnectionContext connection) { return new ProtonSessionIntegrationCallback(this, manager, connection); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bac96047/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java index f74d6d7..88506b6 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java @@ -16,7 +16,6 @@ */ package org.apache.activemq.artemis.core.protocol.proton.plug; - import java.util.concurrent.Executor; import io.netty.buffer.ByteBuf; @@ -49,8 +48,8 @@ import org.proton.plug.SASLResult; import org.proton.plug.context.ProtonPlugSender; import org.proton.plug.sasl.PlainSASLResult; -public class ProtonSessionIntegrationCallback implements AMQPSessionCallback, SessionCallback -{ +public class ProtonSessionIntegrationCallback implements AMQPSessionCallback, SessionCallback { + protected final IDGenerator consumerIDGenerator = new SimpleIDGenerator(0); private final ActiveMQProtonConnectionCallback protonSPI; @@ -63,23 +62,22 @@ public class ProtonSessionIntegrationCallback implements AMQPSessionCallback, Se private AMQPSessionContext protonSession; - public ProtonSessionIntegrationCallback(ActiveMQProtonConnectionCallback protonSPI, ProtonProtocolManager manager, AMQPConnectionContext connection) - { + public ProtonSessionIntegrationCallback(ActiveMQProtonConnectionCallback protonSPI, + ProtonProtocolManager manager, + AMQPConnectionContext connection) { this.protonSPI = protonSPI; this.manager = manager; this.connection = connection; } @Override - public void onFlowConsumer(Object consumer, int credits) - { + public void onFlowConsumer(Object consumer, int credits) { // We have our own flow control on AMQP, so we set activemq's flow control to 0 ((ServerConsumer) consumer).receiveCredits(-1); } @Override - public void init(AMQPSessionContext protonSession, SASLResult saslResult) throws Exception - { + public void init(AMQPSessionContext protonSession, SASLResult saslResult) throws Exception { this.protonSession = protonSession; @@ -87,39 +85,31 @@ public class ProtonSessionIntegrationCallback implements AMQPSessionCallback, Se String user = null; String passcode = null; - if (saslResult != null) - { + if (saslResult != null) { user = saslResult.getUser(); - if (saslResult instanceof PlainSASLResult) - { - passcode = ((PlainSASLResult)saslResult).getPassword(); + if (saslResult instanceof PlainSASLResult) { + passcode = ((PlainSASLResult) saslResult).getPassword(); } } - serverSession = manager.getServer().createSession(name, - user, - passcode, - ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE, - protonSPI.getProtonConnectionDelegate(), // RemotingConnection remotingConnection, + serverSession = manager.getServer().createSession(name, user, passcode, ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE, protonSPI.getProtonConnectionDelegate(), // RemotingConnection remotingConnection, false, // boolean autoCommitSends false, // boolean autoCommitAcks, false, // boolean preAcknowledge, true, //boolean xa, - (String) null, - this, - null, - true); + (String) null, this, null, true); } @Override - public void start() - { + public void start() { } @Override - public Object createSender(ProtonPlugSender protonSender, String queue, String filer, boolean browserOnly) throws Exception - { + public Object createSender(ProtonPlugSender protonSender, + String queue, + String filer, + boolean browserOnly) throws Exception { long consumerID = consumerIDGenerator.generateID(); ServerConsumer consumer = serverSession.createConsumer(consumerID, SimpleString.toSimpleString(queue), SimpleString.toSimpleString(filer), browserOnly); @@ -133,39 +123,32 @@ public class ProtonSessionIntegrationCallback implements AMQPSessionCallback, Se } @Override - public void startSender(Object brokerConsumer) throws Exception - { + public void startSender(Object brokerConsumer) throws Exception { ServerConsumer serverConsumer = (ServerConsumer) brokerConsumer; // flow control is done at proton serverConsumer.receiveCredits(-1); } @Override - public void createTemporaryQueue(String queueName) throws Exception - { + public void createTemporaryQueue(String queueName) throws Exception { serverSession.createQueue(SimpleString.toSimpleString(queueName), SimpleString.toSimpleString(queueName), null, true, false); } @Override - public boolean queueQuery(String queueName) throws Exception - { + public boolean queueQuery(String queueName) throws Exception { boolean queryResult = false; QueueQueryResult queueQuery = serverSession.executeQueueQuery(SimpleString.toSimpleString(queueName)); - if (queueQuery.isExists()) - { + if (queueQuery.isExists()) { queryResult = true; } - else - { - if (queueQuery.isAutoCreateJmsQueues()) - { + else { + if (queueQuery.isAutoCreateJmsQueues()) { serverSession.createQueue(new SimpleString(queueName), new SimpleString(queueName), null, false, true); queryResult = true; } - else - { + else { queryResult = false; } } @@ -174,124 +157,104 @@ public class ProtonSessionIntegrationCallback implements AMQPSessionCallback, Se } @Override - public void closeSender(final Object brokerConsumer) throws Exception - { - Runnable runnable = new Runnable() - { + public void closeSender(final Object brokerConsumer) throws Exception { + Runnable runnable = new Runnable() { @Override - public void run() - { - try - { + public void run() { + try { ((ServerConsumer) brokerConsumer).close(false); } - catch (Exception e) - { + catch (Exception e) { } } }; - // Due to the nature of proton this could be happening within flushes from the queue-delivery (depending on how it happened on the protocol) // to avoid deadlocks the close has to be done outside of the main thread on an executor // otherwise you could get a deadlock Executor executor = protonSPI.getExeuctor(); - if (executor != null) - { + if (executor != null) { executor.execute(runnable); } - else - { + else { runnable.run(); } } @Override - public ProtonJMessage encodeMessage(Object message, int deliveryCount) throws Exception - { + public ProtonJMessage encodeMessage(Object message, int deliveryCount) throws Exception { return (ProtonJMessage) manager.getConverter().outbound((ServerMessage) message, deliveryCount); } @Override - public Binary getCurrentTXID() - { + public Binary getCurrentTXID() { return new Binary(ByteUtil.longToBytes(serverSession.getCurrentTransaction().getID())); } @Override - public String tempQueueName() - { + public String tempQueueName() { return UUIDGenerator.getInstance().generateStringUUID(); } @Override - public void commitCurrentTX() throws Exception - { + public void commitCurrentTX() throws Exception { serverSession.commit(); } @Override - public void rollbackCurrentTX() throws Exception - { + public void rollbackCurrentTX() throws Exception { serverSession.rollback(false); } @Override - public void close() throws Exception - { + public void close() throws Exception { serverSession.close(false); } @Override - public void ack(Object brokerConsumer, Object message) throws Exception - { - ((ServerConsumer)brokerConsumer).individualAcknowledge(null, ((ServerMessage)message).getMessageID()); + public void ack(Object brokerConsumer, Object message) throws Exception { + ((ServerConsumer) brokerConsumer).individualAcknowledge(null, ((ServerMessage) message).getMessageID()); } @Override - public void cancel(Object brokerConsumer, Object message, boolean updateCounts) throws Exception - { - ((ServerConsumer)brokerConsumer).individualCancel(((ServerMessage)message).getMessageID(), updateCounts); + public void cancel(Object brokerConsumer, Object message, boolean updateCounts) throws Exception { + ((ServerConsumer) brokerConsumer).individualCancel(((ServerMessage) message).getMessageID(), updateCounts); } @Override - public void resumeDelivery(Object consumer) - { + public void resumeDelivery(Object consumer) { ((ServerConsumer) consumer).receiveCredits(-1); } @Override - public void serverSend(final Receiver receiver, final Delivery delivery, String address, int messageFormat, ByteBuf messageEncoded) throws Exception - { + public void serverSend(final Receiver receiver, + final Delivery delivery, + String address, + int messageFormat, + ByteBuf messageEncoded) throws Exception { EncodedMessage encodedMessage = new EncodedMessage(messageFormat, messageEncoded.array(), messageEncoded.arrayOffset(), messageEncoded.writerIndex()); ServerMessage message = manager.getConverter().inbound(encodedMessage); //use the address on the receiver if not null, if null let's hope it was set correctly on the message - if (address != null) - { + if (address != null) { message.setAddress(new SimpleString(address)); } serverSession.send(message, false); - manager.getServer().getStorageManager().afterCompleteOperations(new IOCallback() - { + manager.getServer().getStorageManager().afterCompleteOperations(new IOCallback() { @Override - public void done() - { - synchronized (connection.getLock()) - { + public void done() { + synchronized (connection.getLock()) { delivery.settle(); connection.flush(); } } @Override - public void onError(int errorCode, String errorMessage) - { - synchronized (connection.getLock()) - { + public void onError(int errorCode, String errorMessage) { + synchronized (connection.getLock()) { receiver.setCondition(new ErrorCondition(AmqpError.ILLEGAL_STATE, errorCode + ":" + errorMessage)); connection.flush(); } @@ -299,31 +262,24 @@ public class ProtonSessionIntegrationCallback implements AMQPSessionCallback, Se }); } - @Override - public void sendProducerCreditsMessage(int credits, SimpleString address) - { + public void sendProducerCreditsMessage(int credits, SimpleString address) { } @Override - public void sendProducerCreditsFailMessage(int credits, SimpleString address) - { + public void sendProducerCreditsFailMessage(int credits, SimpleString address) { } @Override - public int sendMessage(ServerMessage message, ServerConsumer consumer, int deliveryCount) - { + public int sendMessage(ServerMessage message, ServerConsumer consumer, int deliveryCount) { ProtonPlugSender plugSender = (ProtonPlugSender) consumer.getProtocolContext(); - try - { + try { return plugSender.deliverMessage(message, deliveryCount); } - catch (Exception e) - { - synchronized (connection.getLock()) - { + catch (Exception e) { + synchronized (connection.getLock()) { plugSender.getSender().setCondition(new ErrorCondition(AmqpError.INTERNAL_ERROR, e.getMessage())); connection.flush(); } @@ -333,59 +289,50 @@ public class ProtonSessionIntegrationCallback implements AMQPSessionCallback, Se } @Override - public int sendLargeMessage(ServerMessage message, ServerConsumer consumer, long bodySize, int deliveryCount) - { + public int sendLargeMessage(ServerMessage message, ServerConsumer consumer, long bodySize, int deliveryCount) { return 0; } @Override - public int sendLargeMessageContinuation(ServerConsumer consumer, byte[] body, boolean continues, boolean requiresResponse) - { + public int sendLargeMessageContinuation(ServerConsumer consumer, + byte[] body, + boolean continues, + boolean requiresResponse) { return 0; } @Override - public void closed() - { + public void closed() { } @Override - public void addReadyListener(ReadyListener listener) - { + public void addReadyListener(ReadyListener listener) { } @Override - public void removeReadyListener(ReadyListener listener) - { + public void removeReadyListener(ReadyListener listener) { } @Override - public void disconnect(ServerConsumer consumer, String queueName) - { - synchronized (connection.getLock()) - { + public void disconnect(ServerConsumer consumer, String queueName) { + synchronized (connection.getLock()) { ((Link) consumer.getProtocolContext()).close(); connection.flush(); } } - @Override - public boolean hasCredits(ServerConsumer consumer) - { + public boolean hasCredits(ServerConsumer consumer) { ProtonPlugSender plugSender = (ProtonPlugSender) consumer.getProtocolContext(); - if (plugSender != null && plugSender.getSender().getCredit() > 0) - { + if (plugSender != null && plugSender.getSender().getCredit() > 0) { return true; } - else - { + else { return false; } } - } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bac96047/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/sasl/ActiveMQPlainSASL.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/sasl/ActiveMQPlainSASL.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/sasl/ActiveMQPlainSASL.java index 167925e..c60e9cd 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/sasl/ActiveMQPlainSASL.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/sasl/ActiveMQPlainSASL.java @@ -20,29 +20,23 @@ import org.apache.activemq.artemis.core.security.SecurityStore; import org.apache.activemq.artemis.spi.core.security.ActiveMQSecurityManager; import org.proton.plug.sasl.ServerSASLPlain; -public class ActiveMQPlainSASL extends ServerSASLPlain -{ +public class ActiveMQPlainSASL extends ServerSASLPlain { private final ActiveMQSecurityManager securityManager; private final SecurityStore securityStore; - - public ActiveMQPlainSASL(SecurityStore securityStore, ActiveMQSecurityManager securityManager) - { + public ActiveMQPlainSASL(SecurityStore securityStore, ActiveMQSecurityManager securityManager) { this.securityManager = securityManager; this.securityStore = securityStore; } @Override - protected boolean authenticate(String user, String password) - { - if (securityStore.isSecurityEnabled()) - { + protected boolean authenticate(String user, String password) { + if (securityStore.isSecurityEnabled()) { return securityManager.validateUser(user, password); } - else - { + else { return true; } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bac96047/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/proton/TestConversions.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/proton/TestConversions.java b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/proton/TestConversions.java index 07a219a..0b5cb51 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/proton/TestConversions.java +++ b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/proton/TestConversions.java @@ -48,12 +48,10 @@ import org.junit.Assert; import org.junit.Test; import org.proton.plug.util.NettyWritable; -public class TestConversions extends Assert -{ +public class TestConversions extends Assert { @Test - public void testSimpleConversionBytes() throws Exception - { + public void testSimpleConversionBytes() throws Exception { Map mapprop = createPropertiesMap(); ApplicationProperties properties = new ApplicationProperties(mapprop); MessageImpl message = (MessageImpl) Message.Factory.create(); @@ -61,48 +59,40 @@ public class TestConversions extends Assert byte[] bodyBytes = new byte[4]; - for (int i = 0; i < bodyBytes.length; i++) - { + for (int i = 0; i < bodyBytes.length; i++) { bodyBytes[i] = (byte) 0xff; } message.setBody(new Data(new Binary(bodyBytes))); - EncodedMessage encodedMessage = encodeMessage(message); - ProtonMessageConverter converter = new ProtonMessageConverter(new SimpleIDGenerator(0)); - ServerJMSBytesMessage serverMessage = (ServerJMSBytesMessage)converter.inboundJMSType(encodedMessage); + ServerJMSBytesMessage serverMessage = (ServerJMSBytesMessage) converter.inboundJMSType(encodedMessage); verifyProperties(serverMessage); - assertEquals(bodyBytes.length, serverMessage.getBodyLength()); byte[] newBodyBytes = new byte[4]; serverMessage.readBytes(newBodyBytes); - Assert.assertArrayEquals(bodyBytes, newBodyBytes); - - Object obj = converter.outbound((ServerMessage)serverMessage.getInnerMessage(), 0); + Object obj = converter.outbound((ServerMessage) serverMessage.getInnerMessage(), 0); System.out.println("output = " + obj); } - private void verifyProperties(javax.jms.Message message) throws Exception - { + private void verifyProperties(javax.jms.Message message) throws Exception { assertEquals(true, message.getBooleanProperty("true")); assertEquals(false, message.getBooleanProperty("false")); assertEquals("bar", message.getStringProperty("foo")); } - private Map createPropertiesMap() - { + private Map createPropertiesMap() { Map mapprop = new HashMap<>(); mapprop.put("true", Boolean.TRUE); @@ -112,8 +102,7 @@ public class TestConversions extends Assert } @Test - public void testSimpleConversionMap() throws Exception - { + public void testSimpleConversionMap() throws Exception { Map mapprop = createPropertiesMap(); ApplicationProperties properties = new ApplicationProperties(mapprop); MessageImpl message = (MessageImpl) Message.Factory.create(); @@ -128,33 +117,29 @@ public class TestConversions extends Assert EncodedMessage encodedMessage = encodeMessage(message); ProtonMessageConverter converter = new ProtonMessageConverter(new SimpleIDGenerator(0)); - ServerJMSMapMessage serverMessage = (ServerJMSMapMessage)converter.inboundJMSType(encodedMessage); + ServerJMSMapMessage serverMessage = (ServerJMSMapMessage) converter.inboundJMSType(encodedMessage); verifyProperties(serverMessage); Assert.assertEquals(1, serverMessage.getInt("someint")); Assert.assertEquals("value", serverMessage.getString("somestr")); - Object obj = converter.outbound((ServerMessage)serverMessage.getInnerMessage(), 0); + Object obj = converter.outbound((ServerMessage) serverMessage.getInnerMessage(), 0); reEncodeMsg(obj); - MessageImpl outMessage = (MessageImpl) obj; - AmqpValue value = (AmqpValue)outMessage.getBody(); - Map mapoutput = (Map)value.getValue(); + AmqpValue value = (AmqpValue) outMessage.getBody(); + Map mapoutput = (Map) value.getValue(); assertEquals(Integer.valueOf(1), (Integer) mapoutput.get("someint")); - System.out.println("output = " + obj); } - @Test - public void testSimpleConversionStream() throws Exception - { + public void testSimpleConversionStream() throws Exception { Map mapprop = createPropertiesMap(); ApplicationProperties properties = new ApplicationProperties(mapprop); MessageImpl message = (MessageImpl) Message.Factory.create(); @@ -169,7 +154,7 @@ public class TestConversions extends Assert EncodedMessage encodedMessage = encodeMessage(message); ProtonMessageConverter converter = new ProtonMessageConverter(new SimpleIDGenerator(0)); - ServerJMSStreamMessage serverMessage = (ServerJMSStreamMessage)converter.inboundJMSType(encodedMessage); + ServerJMSStreamMessage serverMessage = (ServerJMSStreamMessage) converter.inboundJMSType(encodedMessage); simulatePersistence(serverMessage); @@ -180,20 +165,19 @@ public class TestConversions extends Assert assertEquals(10, serverMessage.readInt()); assertEquals("10", serverMessage.readString()); - Object obj = converter.outbound((ServerMessage)serverMessage.getInnerMessage(), 0); + Object obj = converter.outbound((ServerMessage) serverMessage.getInnerMessage(), 0); reEncodeMsg(obj); - MessageImpl outMessage = (MessageImpl)obj; - List list = ((AmqpSequence)outMessage.getBody()).getValue(); + MessageImpl outMessage = (MessageImpl) obj; + List list = ((AmqpSequence) outMessage.getBody()).getValue(); Assert.assertEquals(Integer.valueOf(10), list.get(0)); Assert.assertEquals("10", list.get(1)); } @Test - public void testSimpleConversionText() throws Exception - { + public void testSimpleConversionText() throws Exception { Map mapprop = createPropertiesMap(); ApplicationProperties properties = new ApplicationProperties(mapprop); MessageImpl message = (MessageImpl) Message.Factory.create(); @@ -205,24 +189,21 @@ public class TestConversions extends Assert EncodedMessage encodedMessage = encodeMessage(message); ProtonMessageConverter converter = new ProtonMessageConverter(new SimpleIDGenerator(0)); - ServerJMSTextMessage serverMessage = (ServerJMSTextMessage)converter.inboundJMSType(encodedMessage); + ServerJMSTextMessage serverMessage = (ServerJMSTextMessage) converter.inboundJMSType(encodedMessage); simulatePersistence(serverMessage); - verifyProperties(serverMessage); Assert.assertEquals(text, serverMessage.getText()); - - Object obj = converter.outbound((ServerMessage)serverMessage.getInnerMessage(), 0); - + Object obj = converter.outbound((ServerMessage) serverMessage.getInnerMessage(), 0); reEncodeMsg(obj); MessageImpl outMessage = (MessageImpl) obj; - AmqpValue value = (AmqpValue)outMessage.getBody(); - String textValue = (String)value.getValue(); + AmqpValue value = (AmqpValue) outMessage.getBody(); + String textValue = (String) value.getValue(); Assert.assertEquals(text, textValue); @@ -230,17 +211,15 @@ public class TestConversions extends Assert } - private void simulatePersistence(ServerJMSMessage serverMessage) - { + private void simulatePersistence(ServerJMSMessage serverMessage) { serverMessage.getInnerMessage().setAddress(new SimpleString("jms.queue.SomeAddress")); // This is just to simulate what would happen during the persistence of the message // We need to still be able to recover the message when we read it back - ((EncodingSupport)serverMessage.getInnerMessage()).encode(new EmptyBuffer()); + ((EncodingSupport) serverMessage.getInnerMessage()).encode(new EmptyBuffer()); } - private ProtonJMessage reEncodeMsg(Object obj) - { - ProtonJMessage objOut = (ProtonJMessage)obj; + private ProtonJMessage reEncodeMsg(Object obj) { + ProtonJMessage objOut = (ProtonJMessage) obj; ByteBuf nettyBuffer = PooledByteBufAllocator.DEFAULT.heapBuffer(1024); @@ -248,9 +227,7 @@ public class TestConversions extends Assert return objOut; } - - private EncodedMessage encodeMessage(MessageImpl message) - { + private EncodedMessage encodeMessage(MessageImpl message) { ByteBuf buf = PooledByteBufAllocator.DEFAULT.heapBuffer(1024 * 1024); message.encode(new NettyWritable(buf)); byte[] bytesConvert = new byte[buf.writerIndex()]; @@ -258,582 +235,485 @@ public class TestConversions extends Assert return new EncodedMessage(0, bytesConvert, 0, bytesConvert.length); } + class EmptyBuffer implements ActiveMQBuffer { - class EmptyBuffer implements ActiveMQBuffer - { @Override - public ByteBuf byteBuf() - { + public ByteBuf byteBuf() { return null; } @Override - public int capacity() - { + public int capacity() { return 0; } @Override - public int readerIndex() - { + public int readerIndex() { return 0; } @Override - public void readerIndex(int readerIndex) - { + public void readerIndex(int readerIndex) { } @Override - public int writerIndex() - { + public int writerIndex() { return 0; } @Override - public void writerIndex(int writerIndex) - { + public void writerIndex(int writerIndex) { } @Override - public void setIndex(int readerIndex, int writerIndex) - { + public void setIndex(int readerIndex, int writerIndex) { } @Override - public int readableBytes() - { + public int readableBytes() { return 0; } @Override - public int writableBytes() - { + public int writableBytes() { return 0; } @Override - public boolean readable() - { + public boolean readable() { return false; } @Override - public boolean writable() - { + public boolean writable() { return false; } @Override - public void clear() - { + public void clear() { } @Override - public void markReaderIndex() - { + public void markReaderIndex() { } @Override - public void resetReaderIndex() - { + public void resetReaderIndex() { } @Override - public void markWriterIndex() - { + public void markWriterIndex() { } @Override - public void resetWriterIndex() - { + public void resetWriterIndex() { } @Override - public void discardReadBytes() - { + public void discardReadBytes() { } @Override - public byte getByte(int index) - { + public byte getByte(int index) { return 0; } @Override - public short getUnsignedByte(int index) - { + public short getUnsignedByte(int index) { return 0; } @Override - public short getShort(int index) - { + public short getShort(int index) { return 0; } @Override - public int getUnsignedShort(int index) - { + public int getUnsignedShort(int index) { return 0; } @Override - public int getInt(int index) - { + public int getInt(int index) { return 0; } @Override - public long getUnsignedInt(int index) - { + public long getUnsignedInt(int index) { return 0; } @Override - public long getLong(int index) - { + public long getLong(int index) { return 0; } @Override - public void getBytes(int index, ActiveMQBuffer dst) - { + public void getBytes(int index, ActiveMQBuffer dst) { } @Override - public void getBytes(int index, ActiveMQBuffer dst, int length) - { + public void getBytes(int index, ActiveMQBuffer dst, int length) { } @Override - public void getBytes(int index, ActiveMQBuffer dst, int dstIndex, int length) - { + public void getBytes(int index, ActiveMQBuffer dst, int dstIndex, int length) { } @Override - public void getBytes(int index, byte[] dst) - { + public void getBytes(int index, byte[] dst) { } @Override - public void getBytes(int index, byte[] dst, int dstIndex, int length) - { + public void getBytes(int index, byte[] dst, int dstIndex, int length) { } @Override - public void getBytes(int index, ByteBuffer dst) - { + public void getBytes(int index, ByteBuffer dst) { } @Override - public char getChar(int index) - { + public char getChar(int index) { return 0; } @Override - public float getFloat(int index) - { + public float getFloat(int index) { return 0; } @Override - public double getDouble(int index) - { + public double getDouble(int index) { return 0; } @Override - public void setByte(int index, byte value) - { + public void setByte(int index, byte value) { } @Override - public void setShort(int index, short value) - { + public void setShort(int index, short value) { } @Override - public void setInt(int index, int value) - { + public void setInt(int index, int value) { } @Override - public void setLong(int index, long value) - { + public void setLong(int index, long value) { } @Override - public void setBytes(int index, ActiveMQBuffer src) - { + public void setBytes(int index, ActiveMQBuffer src) { } @Override - public void setBytes(int index, ActiveMQBuffer src, int length) - { + public void setBytes(int index, ActiveMQBuffer src, int length) { } @Override - public void setBytes(int index, ActiveMQBuffer src, int srcIndex, int length) - { + public void setBytes(int index, ActiveMQBuffer src, int srcIndex, int length) { } @Override - public void setBytes(int index, byte[] src) - { + public void setBytes(int index, byte[] src) { } @Override - public void setBytes(int index, byte[] src, int srcIndex, int length) - { + public void setBytes(int index, byte[] src, int srcIndex, int length) { } @Override - public void setBytes(int index, ByteBuffer src) - { + public void setBytes(int index, ByteBuffer src) { } @Override - public void setChar(int index, char value) - { + public void setChar(int index, char value) { } @Override - public void setFloat(int index, float value) - { + public void setFloat(int index, float value) { } @Override - public void setDouble(int index, double value) - { + public void setDouble(int index, double value) { } @Override - public byte readByte() - { + public byte readByte() { return 0; } @Override - public short readUnsignedByte() - { + public short readUnsignedByte() { return 0; } @Override - public short readShort() - { + public short readShort() { return 0; } @Override - public int readUnsignedShort() - { + public int readUnsignedShort() { return 0; } @Override - public int readInt() - { + public int readInt() { return 0; } @Override - public long readUnsignedInt() - { + public long readUnsignedInt() { return 0; } @Override - public long readLong() - { + public long readLong() { return 0; } @Override - public char readChar() - { + public char readChar() { return 0; } @Override - public float readFloat() - { + public float readFloat() { return 0; } @Override - public double readDouble() - { + public double readDouble() { return 0; } @Override - public boolean readBoolean() - { + public boolean readBoolean() { return false; } @Override - public SimpleString readNullableSimpleString() - { + public SimpleString readNullableSimpleString() { return null; } @Override - public String readNullableString() - { + public String readNullableString() { return null; } @Override - public SimpleString readSimpleString() - { + public SimpleString readSimpleString() { return null; } @Override - public String readString() - { + public String readString() { return null; } @Override - public String readUTF() - { + public String readUTF() { return null; } @Override - public ActiveMQBuffer readBytes(int length) - { + public ActiveMQBuffer readBytes(int length) { return null; } @Override - public ActiveMQBuffer readSlice(int length) - { + public ActiveMQBuffer readSlice(int length) { return null; } @Override - public void readBytes(ActiveMQBuffer dst) - { + public void readBytes(ActiveMQBuffer dst) { } @Override - public void readBytes(ActiveMQBuffer dst, int length) - { + public void readBytes(ActiveMQBuffer dst, int length) { } @Override - public void readBytes(ActiveMQBuffer dst, int dstIndex, int length) - { + public void readBytes(ActiveMQBuffer dst, int dstIndex, int length) { } @Override - public void readBytes(byte[] dst) - { + public void readBytes(byte[] dst) { } @Override - public void readBytes(byte[] dst, int dstIndex, int length) - { + public void readBytes(byte[] dst, int dstIndex, int length) { } @Override - public void readBytes(ByteBuffer dst) - { + public void readBytes(ByteBuffer dst) { } @Override - public void skipBytes(int length) - { + public void skipBytes(int length) { } @Override - public void writeByte(byte value) - { + public void writeByte(byte value) { } @Override - public void writeShort(short value) - { + public void writeShort(short value) { } @Override - public void writeInt(int value) - { + public void writeInt(int value) { } @Override - public void writeLong(long value) - { + public void writeLong(long value) { } @Override - public void writeChar(char chr) - { + public void writeChar(char chr) { } @Override - public void writeFloat(float value) - { + public void writeFloat(float value) { } @Override - public void writeDouble(double value) - { + public void writeDouble(double value) { } @Override - public void writeBoolean(boolean val) - { + public void writeBoolean(boolean val) { } @Override - public void writeNullableSimpleString(SimpleString val) - { + public void writeNullableSimpleString(SimpleString val) { } @Override - public void writeNullableString(String val) - { + public void writeNullableString(String val) { } @Override - public void writeSimpleString(SimpleString val) - { + public void writeSimpleString(SimpleString val) { } @Override - public void writeString(String val) - { + public void writeString(String val) { } @Override - public void writeUTF(String utf) - { + public void writeUTF(String utf) { } @Override - public void writeBytes(ActiveMQBuffer src, int length) - { + public void writeBytes(ActiveMQBuffer src, int length) { } @Override - public void writeBytes(ActiveMQBuffer src, int srcIndex, int length) - { + public void writeBytes(ActiveMQBuffer src, int srcIndex, int length) { } @Override - public void writeBytes(byte[] src) - { + public void writeBytes(byte[] src) { } @Override - public void writeBytes(byte[] src, int srcIndex, int length) - { + public void writeBytes(byte[] src, int srcIndex, int length) { } @Override - public void writeBytes(ByteBuffer src) - { + public void writeBytes(ByteBuffer src) { } @Override - public ActiveMQBuffer copy() - { + public ActiveMQBuffer copy() { return null; } @Override - public ActiveMQBuffer copy(int index, int length) - { + public ActiveMQBuffer copy(int index, int length) { return null; } @Override - public ActiveMQBuffer slice() - { + public ActiveMQBuffer slice() { return null; } @Override - public ActiveMQBuffer slice(int index, int length) - { + public ActiveMQBuffer slice(int index, int length) { return null; } @Override - public ActiveMQBuffer duplicate() - { + public ActiveMQBuffer duplicate() { return null; } @Override - public ByteBuffer toByteBuffer() - { + public ByteBuffer toByteBuffer() { return null; } @Override - public ByteBuffer toByteBuffer(int index, int length) - { + public ByteBuffer toByteBuffer(int index, int length) { return null; } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bac96047/artemis-protocols/artemis-hornetq-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/HQPropertiesConversionInterceptor.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-hornetq-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/HQPropertiesConversionInterceptor.java b/artemis-protocols/artemis-hornetq-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/HQPropertiesConversionInterceptor.java index b2c9d5c..cb47e85 100644 --- a/artemis-protocols/artemis-hornetq-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/HQPropertiesConversionInterceptor.java +++ b/artemis-protocols/artemis-hornetq-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/HQPropertiesConversionInterceptor.java @@ -31,12 +31,11 @@ import java.util.HashMap; import java.util.HashSet; import java.util.Map; -public class HQPropertiesConversionInterceptor implements Interceptor -{ +public class HQPropertiesConversionInterceptor implements Interceptor { + private static Map dictionary; - static - { + static { Map d = new HashMap(); // Add entries for outgoing messages @@ -67,35 +66,29 @@ public class HQPropertiesConversionInterceptor implements Interceptor } @Override - public boolean intercept(Packet packet, RemotingConnection connection) throws ActiveMQException - { - if (isMessagePacket(packet)) - { + public boolean intercept(Packet packet, RemotingConnection connection) throws ActiveMQException { + if (isMessagePacket(packet)) { handleReceiveMessage((MessagePacket) packet); } return true; } - private void handleReceiveMessage(MessagePacket messagePacket) - { + private void handleReceiveMessage(MessagePacket messagePacket) { Message message = messagePacket.getMessage(); // We are modifying the key set so we iterate over a shallow copy. - for (SimpleString property : new HashSet<>(message.getPropertyNames())) - { - if (dictionary.containsKey(property)) - { + for (SimpleString property : new HashSet<>(message.getPropertyNames())) { + if (dictionary.containsKey(property)) { message.putObjectProperty(dictionary.get(property), message.removeProperty(property)); } } } - private boolean isMessagePacket(Packet packet) - { + private boolean isMessagePacket(Packet packet) { int type = packet.getType(); return type == PacketImpl.SESS_SEND || - type == PacketImpl.SESS_SEND_CONTINUATION || - type == PacketImpl.SESS_SEND_LARGE || - type == PacketImpl.SESS_RECEIVE_LARGE_MSG || - type == PacketImpl.SESS_RECEIVE_MSG; + type == PacketImpl.SESS_SEND_CONTINUATION || + type == PacketImpl.SESS_SEND_LARGE || + type == PacketImpl.SESS_RECEIVE_LARGE_MSG || + type == PacketImpl.SESS_RECEIVE_MSG; } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bac96047/artemis-protocols/artemis-hornetq-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/HornetQProtocolManager.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-hornetq-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/HornetQProtocolManager.java b/artemis-protocols/artemis-hornetq-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/HornetQProtocolManager.java index c40bbd9..3d6dab5 100644 --- a/artemis-protocols/artemis-hornetq-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/HornetQProtocolManager.java +++ b/artemis-protocols/artemis-hornetq-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/HornetQProtocolManager.java @@ -29,16 +29,17 @@ import java.util.List; /** * HornetQ Protocol Manager */ -class HornetQProtocolManager extends CoreProtocolManager -{ - HornetQProtocolManager(CoreProtocolManagerFactory factory, ActiveMQServer server, List incomingInterceptors, List outgoingInterceptors) - { +class HornetQProtocolManager extends CoreProtocolManager { + + HornetQProtocolManager(CoreProtocolManagerFactory factory, + ActiveMQServer server, + List incomingInterceptors, + List outgoingInterceptors) { super(factory, server, incomingInterceptors, outgoingInterceptors); } @Override - public void handshake(NettyServerConnection connection, ActiveMQBuffer buffer) - { + public void handshake(NettyServerConnection connection, ActiveMQBuffer buffer) { //if we are not an old client then handshake if (buffer.getByte(0) == 'H' && buffer.getByte(1) == 'O' && @@ -46,16 +47,14 @@ class HornetQProtocolManager extends CoreProtocolManager buffer.getByte(3) == 'N' && buffer.getByte(4) == 'E' && buffer.getByte(5) == 'T' && - buffer.getByte(6) == 'Q') - { + buffer.getByte(6) == 'Q') { //todo add some handshaking buffer.readBytes(7); } } @Override - public boolean isProtocol(byte[] array) - { + public boolean isProtocol(byte[] array) { String frameStart = new String(array, StandardCharsets.US_ASCII); return frameStart.startsWith("HORNETQ"); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bac96047/artemis-protocols/artemis-hornetq-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/HornetQProtocolManagerFactory.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-hornetq-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/HornetQProtocolManagerFactory.java b/artemis-protocols/artemis-hornetq-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/HornetQProtocolManagerFactory.java index 1574c07..a163459 100644 --- a/artemis-protocols/artemis-hornetq-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/HornetQProtocolManagerFactory.java +++ b/artemis-protocols/artemis-hornetq-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/HornetQProtocolManagerFactory.java @@ -18,22 +18,22 @@ package org.apache.activemq.artemis.core.protocol.hornetq; import java.util.List; - import org.apache.activemq.artemis.api.core.Interceptor; import org.apache.activemq.artemis.core.protocol.core.impl.CoreProtocolManagerFactory; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.spi.core.protocol.ProtocolManager; -public class HornetQProtocolManagerFactory extends CoreProtocolManagerFactory -{ +public class HornetQProtocolManagerFactory extends CoreProtocolManagerFactory { + public static final String HORNETQ_PROTOCOL_NAME = "HORNETQ"; private static final String MODULE_NAME = "artemis-hornetq-protocol"; private static String[] SUPPORTED_PROTOCOLS = {HORNETQ_PROTOCOL_NAME}; - public ProtocolManager createProtocolManager(final ActiveMQServer server, final List incomingInterceptors, List outgoingInterceptors) - { + public ProtocolManager createProtocolManager(final ActiveMQServer server, + final List incomingInterceptors, + List outgoingInterceptors) { Interceptor propertyConversionInterceptor = new HQPropertiesConversionInterceptor(); incomingInterceptors.add(propertyConversionInterceptor); outgoingInterceptors.add(propertyConversionInterceptor); @@ -41,14 +41,12 @@ public class HornetQProtocolManagerFactory extends CoreProtocolManagerFactory } @Override - public String[] getProtocols() - { + public String[] getProtocols() { return SUPPORTED_PROTOCOLS; } @Override - public String getModuleName() - { + public String getModuleName() { return MODULE_NAME; } }