Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 0944B200C2A for ; Wed, 1 Mar 2017 21:20:38 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 07C1E160B70; Wed, 1 Mar 2017 20:20:38 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 1A887160B56 for ; Wed, 1 Mar 2017 21:20:36 +0100 (CET) Received: (qmail 44411 invoked by uid 500); 1 Mar 2017 20:20:36 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 44402 invoked by uid 99); 1 Mar 2017 20:20:36 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 01 Mar 2017 20:20:36 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 2AC92DFDE6; Wed, 1 Mar 2017 20:20:36 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: clebertsuconic@apache.org To: commits@activemq.apache.org Message-Id: <9a91845f05af46f4ab3a1dee0d10c423@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: activemq-artemis git commit: fix mqtt Date: Wed, 1 Mar 2017 20:20:36 +0000 (UTC) archived-at: Wed, 01 Mar 2017 20:20:38 -0000 Repository: activemq-artemis Updated Branches: refs/heads/artemis-1009 9cbff99cd -> 5f01fc59b fix mqtt Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/5f01fc59 Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/5f01fc59 Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/5f01fc59 Branch: refs/heads/artemis-1009 Commit: 5f01fc59b0ca2bd954ad303a8796f2df261b0491 Parents: 9cbff99 Author: Clebert Suconic Authored: Wed Mar 1 15:02:22 2017 -0500 Committer: Clebert Suconic Committed: Wed Mar 1 15:20:11 2017 -0500 ---------------------------------------------------------------------- .../activemq/artemis/core/message/impl/CoreMessage.java | 1 + .../artemis/core/protocol/mqtt/MQTTPublishManager.java | 10 +++++++--- .../artemis/core/protocol/mqtt/MQTTSessionCallback.java | 2 +- .../activemq/artemis/core/protocol/mqtt/MQTTUtil.java | 4 +--- 4 files changed, 10 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/5f01fc59/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java index 973c1de..0da9f03 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java @@ -243,6 +243,7 @@ public class CoreMessage extends RefCountMessage { @Override public Message copy() { + checkEncode(); return new CoreMessage(this); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/5f01fc59/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java index e619eb9..67ef258 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java @@ -32,12 +32,15 @@ import org.apache.activemq.artemis.core.message.impl.CoreMessage; import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.ServerConsumer; import org.apache.activemq.artemis.core.transaction.Transaction; +import org.jboss.logging.Logger; /** * Handles MQTT Exactly Once (QoS level 2) Protocol. */ public class MQTTPublishManager { + private static final Logger logger = Logger.getLogger(MQTTPublishManager.class); + private static final String MANAGEMENT_QUEUE_PREFIX = "$sys.mqtt.queue.qos2."; private SimpleString managementAddress; @@ -173,6 +176,7 @@ public class MQTTPublishManager { } tx.commit(); } catch (Throwable t) { + logger.warn(t.getMessage(), t); tx.rollback(); throw t; } @@ -253,17 +257,17 @@ public class MQTTPublishManager { switch (message.getType()) { case Message.TEXT_TYPE: try { - SimpleString text = message.getBodyBuffer().readNullableSimpleString(); + SimpleString text = message.getReadOnlyBodyBuffer().readNullableSimpleString(); byte[] stringPayload = text.toString().getBytes("UTF-8"); payload = ByteBufAllocator.DEFAULT.buffer(stringPayload.length); payload.writeBytes(stringPayload); break; } catch (UnsupportedEncodingException e) { - log.warn("Unable to send message: " + message.getMessageID() + " Cause: " + e.getMessage()); + log.warn("Unable to send message: " + message.getMessageID() + " Cause: " + e.getMessage(), e); } default: ActiveMQBuffer bufferDup = message.getReadOnlyBodyBuffer(); - payload = bufferDup.readBytes(message.getEndOfBodyPosition() - bufferDup.readerIndex()).byteBuf(); + payload = bufferDup.readBytes(bufferDup.writerIndex()).byteBuf(); break; } session.getProtocolHandler().send(messageId, address, qos, payload, deliveryCount); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/5f01fc59/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java index b997d80..a5b908f 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java @@ -51,7 +51,7 @@ public class MQTTSessionCallback implements SessionCallback { try { session.getMqttPublishManager().sendMessage((CoreMessage)message, consumer, deliveryCount); } catch (Exception e) { - log.warn("Unable to send message: " + message.getMessageID() + " Cause: " + e.getMessage()); + log.warn("Unable to send message: " + message.getMessageID() + " Cause: " + e.getMessage(), e); } return 1; } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/5f01fc59/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java index 6891497..e7b8c50 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java @@ -26,7 +26,6 @@ import io.netty.handler.codec.mqtt.MqttSubscribeMessage; import io.netty.handler.codec.mqtt.MqttTopicSubscription; import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.SimpleString; -import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper; import org.apache.activemq.artemis.core.config.WildcardConfiguration; import org.apache.activemq.artemis.core.message.impl.CoreMessage; @@ -114,8 +113,7 @@ public class MQTTUtil { String coreAddress = convertMQTTAddressFilterToCore(topic, session.getWildcardConfiguration()); Message message = createServerMessage(session, new SimpleString(coreAddress), retain, qos); - // FIXME does this involve a copy? - message.getBodyBuffer().writeBytes(new ChannelBufferWrapper(payload), payload.readableBytes()); + message.getBodyBuffer().writeBytes(payload, 0, payload.readableBytes()); return message; }