Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 6FCC4200C3A for ; Thu, 2 Mar 2017 16:05:38 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 6E78D160B84; Thu, 2 Mar 2017 15:05:38 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 972E8160B91 for ; Thu, 2 Mar 2017 16:05:37 +0100 (CET) Received: (qmail 57069 invoked by uid 500); 2 Mar 2017 15:05:36 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 56179 invoked by uid 99); 2 Mar 2017 15:05:35 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 02 Mar 2017 15:05:35 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id D479DF0DBE; Thu, 2 Mar 2017 15:05:35 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: clebertsuconic@apache.org To: commits@activemq.apache.org Date: Thu, 02 Mar 2017 15:06:01 -0000 Message-Id: <71e5c8b66ab64ab8a1a4115e589ddb38@git.apache.org> In-Reply-To: <549cadbf627048ab90dbf12bb24d5fcc@git.apache.org> References: <549cadbf627048ab90dbf12bb24d5fcc@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [27/29] activemq-artemis git commit: Fixing tests (mqtt) archived-at: Thu, 02 Mar 2017 15:05:38 -0000 Fixing tests (mqtt) Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/5acb6f5a Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/5acb6f5a Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/5acb6f5a Branch: refs/heads/artemis-1009 Commit: 5acb6f5a8e855f2e27e59461a996da37848479c1 Parents: c58a3d0 Author: Clebert Suconic Authored: Wed Mar 1 15:28:33 2017 -0500 Committer: Clebert Suconic Committed: Thu Mar 2 10:05:21 2017 -0500 ---------------------------------------------------------------------- .../activemq/artemis/core/message/impl/CoreMessage.java | 2 ++ .../artemis/core/protocol/mqtt/MQTTPublishManager.java | 10 +++++++--- .../artemis/core/protocol/mqtt/MQTTSessionCallback.java | 2 +- .../activemq/artemis/core/protocol/mqtt/MQTTUtil.java | 4 +--- 4 files changed, 11 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/5acb6f5a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java index 973c1de..b1bad5a 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java @@ -131,6 +131,7 @@ public class CoreMessage extends RefCountMessage { @Override public ActiveMQBuffer getReadOnlyBodyBuffer() { + checkEncode(); internalWritableBuffer(); return new ChannelBufferWrapper(buffer.slice(BODY_OFFSET, endOfBodyPosition - BUFFER_HEADER_SPACE).setIndex(0, endOfBodyPosition - BUFFER_HEADER_SPACE).asReadOnly()); } @@ -243,6 +244,7 @@ public class CoreMessage extends RefCountMessage { @Override public Message copy() { + checkEncode(); return new CoreMessage(this); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/5acb6f5a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java index e619eb9..67ef258 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java @@ -32,12 +32,15 @@ import org.apache.activemq.artemis.core.message.impl.CoreMessage; import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.ServerConsumer; import org.apache.activemq.artemis.core.transaction.Transaction; +import org.jboss.logging.Logger; /** * Handles MQTT Exactly Once (QoS level 2) Protocol. */ public class MQTTPublishManager { + private static final Logger logger = Logger.getLogger(MQTTPublishManager.class); + private static final String MANAGEMENT_QUEUE_PREFIX = "$sys.mqtt.queue.qos2."; private SimpleString managementAddress; @@ -173,6 +176,7 @@ public class MQTTPublishManager { } tx.commit(); } catch (Throwable t) { + logger.warn(t.getMessage(), t); tx.rollback(); throw t; } @@ -253,17 +257,17 @@ public class MQTTPublishManager { switch (message.getType()) { case Message.TEXT_TYPE: try { - SimpleString text = message.getBodyBuffer().readNullableSimpleString(); + SimpleString text = message.getReadOnlyBodyBuffer().readNullableSimpleString(); byte[] stringPayload = text.toString().getBytes("UTF-8"); payload = ByteBufAllocator.DEFAULT.buffer(stringPayload.length); payload.writeBytes(stringPayload); break; } catch (UnsupportedEncodingException e) { - log.warn("Unable to send message: " + message.getMessageID() + " Cause: " + e.getMessage()); + log.warn("Unable to send message: " + message.getMessageID() + " Cause: " + e.getMessage(), e); } default: ActiveMQBuffer bufferDup = message.getReadOnlyBodyBuffer(); - payload = bufferDup.readBytes(message.getEndOfBodyPosition() - bufferDup.readerIndex()).byteBuf(); + payload = bufferDup.readBytes(bufferDup.writerIndex()).byteBuf(); break; } session.getProtocolHandler().send(messageId, address, qos, payload, deliveryCount); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/5acb6f5a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java index b997d80..a5b908f 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java @@ -51,7 +51,7 @@ public class MQTTSessionCallback implements SessionCallback { try { session.getMqttPublishManager().sendMessage((CoreMessage)message, consumer, deliveryCount); } catch (Exception e) { - log.warn("Unable to send message: " + message.getMessageID() + " Cause: " + e.getMessage()); + log.warn("Unable to send message: " + message.getMessageID() + " Cause: " + e.getMessage(), e); } return 1; } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/5acb6f5a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java index 6891497..e7b8c50 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java @@ -26,7 +26,6 @@ import io.netty.handler.codec.mqtt.MqttSubscribeMessage; import io.netty.handler.codec.mqtt.MqttTopicSubscription; import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.SimpleString; -import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper; import org.apache.activemq.artemis.core.config.WildcardConfiguration; import org.apache.activemq.artemis.core.message.impl.CoreMessage; @@ -114,8 +113,7 @@ public class MQTTUtil { String coreAddress = convertMQTTAddressFilterToCore(topic, session.getWildcardConfiguration()); Message message = createServerMessage(session, new SimpleString(coreAddress), retain, qos); - // FIXME does this involve a copy? - message.getBodyBuffer().writeBytes(new ChannelBufferWrapper(payload), payload.readableBytes()); + message.getBodyBuffer().writeBytes(payload, 0, payload.readableBytes()); return message; }