From commits-return-50246-archive-asf-public=cust-asf.ponee.io@activemq.apache.org Tue Jan 23 00:06:09 2018 Return-Path: X-Original-To: archive-asf-public@eu.ponee.io Delivered-To: archive-asf-public@eu.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by mx-eu-01.ponee.io (Postfix) with ESMTP id 2BAD118079E for ; Tue, 23 Jan 2018 00:06:09 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 18DED160C4C; Mon, 22 Jan 2018 23:06:09 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 8D264160C4D for ; Tue, 23 Jan 2018 00:06:07 +0100 (CET) Received: (qmail 74001 invoked by uid 500); 22 Jan 2018 23:06:06 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 73980 invoked by uid 99); 22 Jan 2018 23:06:06 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 22 Jan 2018 23:06:06 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 8314FF3534; Mon, 22 Jan 2018 23:06:05 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: clebertsuconic@apache.org To: commits@activemq.apache.org Date: Mon, 22 Jan 2018 23:06:06 -0000 Message-Id: <4adf6e8a725a4f0aa1857b4a839274ab@git.apache.org> In-Reply-To: <96a846b866664ceab12ccdfcfd3f6c50@git.apache.org> References: <96a846b866664ceab12ccdfcfd3f6c50@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [02/10] activemq-artemis git commit: ARTEMIS-1616 OpenWire improvements ARTEMIS-1616 OpenWire improvements Refactored OpenWireMessageConverter::toAMQMessage into smaller methods Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/c6b6dd95 Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/c6b6dd95 Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/c6b6dd95 Branch: refs/heads/master Commit: c6b6dd95d1665230d667557df240d8a62a2118af Parents: e7a1dca Author: Francesco Nigro Authored: Wed Jan 17 14:37:08 2018 +0100 Committer: Clebert Suconic Committed: Mon Jan 22 18:02:03 2018 -0500 ---------------------------------------------------------------------- .../openwire/OpenWireMessageConverter.java | 562 +++++++++++-------- 1 file changed, 316 insertions(+), 246 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c6b6dd95/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java index 0948f8a..3dc4a4e 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java @@ -120,7 +120,7 @@ public class OpenWireMessageConverter implements MessageConverter 0) { - mapData.decode(buffer.byteBuf()); - Map map = mapData.getMap(); - ByteArrayOutputStream out = new ByteArrayOutputStream(mapData.getEncodeSize()); - OutputStream os = out; - if (isCompressed) { - os = new DeflaterOutputStream(os, true); - } - try (DataOutputStream dataOut = new DataOutputStream(os)) { - MarshallingSupport.marshalPrimitiveMap(map, dataOut); - dataOut.flush(); - } - bytes = out.toByteArray(); - } - - } else if (coreType == org.apache.activemq.artemis.api.core.Message.OBJECT_TYPE) { - if (buffer.readableBytes() > 0) { - int len = buffer.readInt(); - bytes = new byte[len]; - buffer.readBytes(bytes); - if (isCompressed) { - ByteArrayOutputStream bytesOut = new ByteArrayOutputStream(); - try (DeflaterOutputStream out = new DeflaterOutputStream(bytesOut, true)) { - out.write(bytes); - out.flush(); - } - bytes = bytesOut.toByteArray(); - } - } - } else if (coreType == org.apache.activemq.artemis.api.core.Message.STREAM_TYPE) { - org.apache.activemq.util.ByteArrayOutputStream bytesOut = new org.apache.activemq.util.ByteArrayOutputStream(); - OutputStream out = bytesOut; - if (isCompressed) { - out = new DeflaterOutputStream(bytesOut, true); - } - try (DataOutputStream dataOut = new DataOutputStream(out)) { - - boolean stop = false; - while (!stop && buffer.readable()) { - byte primitiveType = buffer.readByte(); - switch (primitiveType) { - case DataConstants.BOOLEAN: - MarshallingSupport.marshalBoolean(dataOut, buffer.readBoolean()); - break; - case DataConstants.BYTE: - MarshallingSupport.marshalByte(dataOut, buffer.readByte()); - break; - case DataConstants.BYTES: - int len = buffer.readInt(); - byte[] bytesData = new byte[len]; - buffer.readBytes(bytesData); - MarshallingSupport.marshalByteArray(dataOut, bytesData); - break; - case DataConstants.CHAR: - char ch = (char) buffer.readShort(); - MarshallingSupport.marshalChar(dataOut, ch); - break; - case DataConstants.DOUBLE: - double doubleVal = Double.longBitsToDouble(buffer.readLong()); - MarshallingSupport.marshalDouble(dataOut, doubleVal); - break; - case DataConstants.FLOAT: - Float floatVal = Float.intBitsToFloat(buffer.readInt()); - MarshallingSupport.marshalFloat(dataOut, floatVal); - break; - case DataConstants.INT: - MarshallingSupport.marshalInt(dataOut, buffer.readInt()); - break; - case DataConstants.LONG: - MarshallingSupport.marshalLong(dataOut, buffer.readLong()); - break; - case DataConstants.SHORT: - MarshallingSupport.marshalShort(dataOut, buffer.readShort()); - break; - case DataConstants.STRING: - String string = buffer.readNullableString(); - if (string == null) { - MarshallingSupport.marshalNull(dataOut); - } else { - MarshallingSupport.marshalString(dataOut, string); - } - break; - default: - //now we stop - stop = true; - break; - } - dataOut.flush(); - } - } - bytes = bytesOut.toByteArray(); - } else if (coreType == org.apache.activemq.artemis.api.core.Message.BYTES_TYPE) { - int n = buffer.readableBytes(); - bytes = new byte[n]; - buffer.readBytes(bytes); - if (isCompressed) { - int length = bytes.length; - Deflater deflater = new Deflater(); - try (org.apache.activemq.util.ByteArrayOutputStream compressed = new org.apache.activemq.util.ByteArrayOutputStream()) { - compressed.write(new byte[4]); - deflater.setInput(bytes); - deflater.finish(); - byte[] bytesBuf = new byte[1024]; - while (!deflater.finished()) { - int count = deflater.deflate(bytesBuf); - compressed.write(bytesBuf, 0, count); - } - compressed.flush(); - ByteSequence byteSeq = compressed.toByteSequence(); - ByteSequenceData.writeIntBig(byteSeq, length); - bytes = Arrays.copyOfRange(byteSeq.data, 0, byteSeq.length); - } finally { - deflater.end(); - } - } - } else { - int n = buffer.readableBytes(); - bytes = new byte[n]; - buffer.readBytes(bytes); - if (isCompressed) { - try (ByteArrayOutputStream bytesOut = new ByteArrayOutputStream(); - DeflaterOutputStream out = new DeflaterOutputStream(bytesOut, true)) { - out.write(bytes); - out.flush(); - bytes = bytesOut.toByteArray(); - } - } - } - - buffer.resetReaderIndex();// this is important for topics as the buffer - // may be read multiple times - } - //we need check null because messages may come from other clients //and those amq specific attribute may not be set. Long arrival = (Long) coreMessage.getObjectProperty(AMQ_MSG_ARRIVAL); @@ -740,24 +590,14 @@ public class OpenWireMessageConverter implements MessageConverter props = coreMessage.getPropertyNames(); + final Set props = coreMessage.getPropertyNames(); if (props != null) { - for (SimpleString s : props) { - String keyStr = s.toString(); - if ((keyStr.startsWith("_AMQ") || keyStr.startsWith("__HDR_")) && - !consumer.hasNotificationDestination()) { - continue; + setAMQMsgObjectProperties(amqMsg, coreMessage, props, consumer); + } + + if (bytes != null) { + ByteSequence content = new ByteSequence(bytes); + amqMsg.setContent(content); + } + return amqMsg; + } + + private static byte[] toAMQMessageTextType(final ActiveMQBuffer buffer, + final boolean isCompressed) throws IOException { + byte[] bytes = null; + SimpleString text = buffer.readNullableSimpleString(); + if (text != null) { + ByteArrayOutputStream bytesOut = new ByteArrayOutputStream(text.length() + 4); + OutputStream out = bytesOut; + if (isCompressed) { + out = new DeflaterOutputStream(out, true); + } + try (DataOutputStream dataOut = new DataOutputStream(out)) { + MarshallingSupport.writeUTF8(dataOut, text.toString()); + dataOut.flush(); + bytes = bytesOut.toByteArray(); + } + } + return bytes; + } + + private static byte[] toAMQMessageMapType(final ActiveMQBuffer buffer, + final boolean isCompressed) throws IOException { + byte[] bytes = null; + //it could be a null map + if (buffer.readableBytes() > 0) { + TypedProperties mapData = new TypedProperties(); + mapData.decode(buffer.byteBuf()); + Map map = mapData.getMap(); + ByteArrayOutputStream out = new ByteArrayOutputStream(mapData.getEncodeSize()); + OutputStream os = out; + if (isCompressed) { + os = new DeflaterOutputStream(os, true); + } + try (DataOutputStream dataOut = new DataOutputStream(os)) { + MarshallingSupport.marshalPrimitiveMap(map, dataOut); + dataOut.flush(); + } + bytes = out.toByteArray(); + } + return bytes; + } + + private static byte[] toAMQMessageObjectType(final ActiveMQBuffer buffer, + final boolean isCompressed) throws IOException { + byte[] bytes = null; + if (buffer.readableBytes() > 0) { + int len = buffer.readInt(); + bytes = new byte[len]; + buffer.readBytes(bytes); + if (isCompressed) { + ByteArrayOutputStream bytesOut = new ByteArrayOutputStream(); + try (DeflaterOutputStream out = new DeflaterOutputStream(bytesOut, true)) { + out.write(bytes); + out.flush(); } - Object prop = coreMessage.getObjectProperty(s); - try { - if (prop instanceof SimpleString) { - amqMsg.setObjectProperty(s.toString(), prop.toString()); - } else { - if (keyStr.equals(MessageUtil.JMSXDELIVERYCOUNT) && prop instanceof Long) { - Long l = (Long) prop; - amqMsg.setObjectProperty(s.toString(), l.intValue()); + bytes = bytesOut.toByteArray(); + } + } + return bytes; + } + + private static byte[] toAMQMessageStreamType(final ActiveMQBuffer buffer, + final boolean isCompressed) throws IOException { + byte[] bytes; + org.apache.activemq.util.ByteArrayOutputStream bytesOut = new org.apache.activemq.util.ByteArrayOutputStream(); + OutputStream out = bytesOut; + if (isCompressed) { + out = new DeflaterOutputStream(bytesOut, true); + } + try (DataOutputStream dataOut = new DataOutputStream(out)) { + + boolean stop = false; + while (!stop && buffer.readable()) { + byte primitiveType = buffer.readByte(); + switch (primitiveType) { + case DataConstants.BOOLEAN: + MarshallingSupport.marshalBoolean(dataOut, buffer.readBoolean()); + break; + case DataConstants.BYTE: + MarshallingSupport.marshalByte(dataOut, buffer.readByte()); + break; + case DataConstants.BYTES: + int len = buffer.readInt(); + byte[] bytesData = new byte[len]; + buffer.readBytes(bytesData); + MarshallingSupport.marshalByteArray(dataOut, bytesData); + break; + case DataConstants.CHAR: + char ch = (char) buffer.readShort(); + MarshallingSupport.marshalChar(dataOut, ch); + break; + case DataConstants.DOUBLE: + double doubleVal = Double.longBitsToDouble(buffer.readLong()); + MarshallingSupport.marshalDouble(dataOut, doubleVal); + break; + case DataConstants.FLOAT: + Float floatVal = Float.intBitsToFloat(buffer.readInt()); + MarshallingSupport.marshalFloat(dataOut, floatVal); + break; + case DataConstants.INT: + MarshallingSupport.marshalInt(dataOut, buffer.readInt()); + break; + case DataConstants.LONG: + MarshallingSupport.marshalLong(dataOut, buffer.readLong()); + break; + case DataConstants.SHORT: + MarshallingSupport.marshalShort(dataOut, buffer.readShort()); + break; + case DataConstants.STRING: + String string = buffer.readNullableString(); + if (string == null) { + MarshallingSupport.marshalNull(dataOut); } else { - amqMsg.setObjectProperty(s.toString(), prop); + MarshallingSupport.marshalString(dataOut, string); } - } - } catch (JMSException e) { - throw new IOException("exception setting property " + s + " : " + prop, e); + break; + default: + //now we stop + stop = true; + break; } + dataOut.flush(); } } + bytes = bytesOut.toByteArray(); + return bytes; + } - amqMsg.setCompressed(isCompressed); - if (bytes != null) { - ByteSequence content = new ByteSequence(bytes); - amqMsg.setContent(content); + private static byte[] toAMQMessageBytesType(final ActiveMQBuffer buffer, + final boolean isCompressed) throws IOException { + int n = buffer.readableBytes(); + byte[] bytes = new byte[n]; + buffer.readBytes(bytes); + if (isCompressed) { + bytes = toAMQMessageCompressedBytesType(bytes); + } + return bytes; + } + + private static byte[] toAMQMessageCompressedBytesType(final byte[] bytes) throws IOException { + int length = bytes.length; + Deflater deflater = new Deflater(); + try (org.apache.activemq.util.ByteArrayOutputStream compressed = new org.apache.activemq.util.ByteArrayOutputStream()) { + compressed.write(new byte[4]); + deflater.setInput(bytes); + deflater.finish(); + byte[] bytesBuf = new byte[1024]; + while (!deflater.finished()) { + int count = deflater.deflate(bytesBuf); + compressed.write(bytesBuf, 0, count); + } + compressed.flush(); + ByteSequence byteSeq = compressed.toByteSequence(); + ByteSequenceData.writeIntBig(byteSeq, length); + return Arrays.copyOfRange(byteSeq.data, 0, byteSeq.length); + } finally { + deflater.end(); } - return amqMsg; } + private static byte[] toAMQMessageDefaultType(final ActiveMQBuffer buffer, + final boolean isCompressed) throws IOException { + int n = buffer.readableBytes(); + byte[] bytes = new byte[n]; + buffer.readBytes(bytes); + if (isCompressed) { + try (ByteArrayOutputStream bytesOut = new ByteArrayOutputStream(); DeflaterOutputStream out = new DeflaterOutputStream(bytesOut, true)) { + out.write(bytes); + out.flush(); + bytes = bytesOut.toByteArray(); + } + } + return bytes; + } + + private static void setAMQMsgBrokerPath(final ActiveMQMessage amqMsg, final String brokerPath) { + String[] brokers = brokerPath.split(","); + BrokerId[] bids = new BrokerId[brokers.length]; + for (int i = 0; i < bids.length; i++) { + bids[i] = new BrokerId(brokers[i]); + } + amqMsg.setBrokerPath(bids); + } + + private static void setAMQMsgClusterPath(final ActiveMQMessage amqMsg, final String clusterPath) { + String[] cluster = clusterPath.split(","); + BrokerId[] bids = new BrokerId[cluster.length]; + for (int i = 0; i < bids.length; i++) { + bids[i] = new BrokerId(cluster[i]); + } + amqMsg.setCluster(bids); + } + + private static void setAMQMsgDataStructure(final ActiveMQMessage amqMsg, + final WireFormat marshaller, + final byte[] dsBytes) throws IOException { + ByteSequence seq = new ByteSequence(dsBytes); + DataStructure ds = (DataStructure) marshaller.unmarshal(seq); + amqMsg.setDataStructure(ds); + } + + private static void setAMQMsgOriginalDestination(final ActiveMQMessage amqMsg, + final WireFormat marshaller, + final byte[] origDestBytes) throws IOException { + ActiveMQDestination origDest = (ActiveMQDestination) marshaller.unmarshal(new ByteSequence(origDestBytes)); + amqMsg.setOriginalDestination(origDest); + } + + private static void setAMQMsgOriginalTransactionId(final ActiveMQMessage amqMsg, + final WireFormat marshaller, + final byte[] origTxIdBytes) throws IOException { + TransactionId origTxId = (TransactionId) marshaller.unmarshal(new ByteSequence(origTxIdBytes)); + amqMsg.setOriginalTransactionId(origTxId); + } + + private static void setAMQMsgReplyTo(final ActiveMQMessage amqMsg, + final WireFormat marshaller, + final byte[] replyToBytes) throws IOException { + ActiveMQDestination replyTo = (ActiveMQDestination) marshaller.unmarshal(new ByteSequence(replyToBytes)); + amqMsg.setReplyTo(replyTo); + } + + private static void setAMQMsgDlqDeliveryFailureCause(final ActiveMQMessage amqMsg, + final SimpleString dlqCause) throws IOException { + try { + amqMsg.setStringProperty(ActiveMQMessage.DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY, dlqCause.toString()); + } catch (JMSException e) { + throw new IOException("failure to set dlq property " + dlqCause, e); + } + } + + private static void setAMQMsgHdrLastValueName(final ActiveMQMessage amqMsg, + final SimpleString lastValueProperty) throws IOException { + try { + amqMsg.setStringProperty(org.apache.activemq.artemis.api.core.Message.HDR_LAST_VALUE_NAME.toString(), lastValueProperty.toString()); + } catch (JMSException e) { + throw new IOException("failure to set lvq property " + lastValueProperty, e); + } + } + + private static void setAMQMsgObjectProperties(final ActiveMQMessage amqMsg, + final ICoreMessage coreMessage, + final Set props, + final AMQConsumer consumer) throws IOException { + for (SimpleString s : props) { + final String keyStr = s.toString(); + if (!consumer.hasNotificationDestination() && (keyStr.startsWith("_AMQ") || keyStr.startsWith("__HDR_"))) { + continue; + } + final Object prop = coreMessage.getObjectProperty(s); + try { + if (prop instanceof SimpleString) { + amqMsg.setObjectProperty(keyStr, prop.toString()); + } else { + if (keyStr.equals(MessageUtil.JMSXDELIVERYCOUNT) && prop instanceof Long) { + Long l = (Long) prop; + amqMsg.setObjectProperty(keyStr, l.intValue()); + } else { + amqMsg.setObjectProperty(keyStr, prop); + } + } + } catch (JMSException e) { + throw new IOException("exception setting property " + s + " : " + prop, e); + } + } + } }