activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From clebertsuco...@apache.org
Subject activemq-artemis git commit: Fixing tests (mqtt) [Forced Update!]
Date Wed, 01 Mar 2017 20:28:56 GMT
Repository: activemq-artemis
Updated Branches:
  refs/heads/artemis-1009 5f01fc59b -> 6e9c0eae1 (forced update)


Fixing tests (mqtt)


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/6e9c0eae
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/6e9c0eae
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/6e9c0eae

Branch: refs/heads/artemis-1009
Commit: 6e9c0eae1ec7ab8dd6d9cbdfe24f9aced64106e7
Parents: 9cbff99
Author: Clebert Suconic <clebertsuconic@apache.org>
Authored: Wed Mar 1 15:28:33 2017 -0500
Committer: Clebert Suconic <clebertsuconic@apache.org>
Committed: Wed Mar 1 15:28:33 2017 -0500

----------------------------------------------------------------------
 .../activemq/artemis/core/message/impl/CoreMessage.java   |  2 ++
 .../artemis/core/protocol/mqtt/MQTTPublishManager.java    | 10 +++++++---
 .../artemis/core/protocol/mqtt/MQTTSessionCallback.java   |  2 +-
 .../activemq/artemis/core/protocol/mqtt/MQTTUtil.java     |  4 +---
 4 files changed, 11 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6e9c0eae/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
index 973c1de..b1bad5a 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
@@ -131,6 +131,7 @@ public class CoreMessage extends RefCountMessage {
 
    @Override
    public ActiveMQBuffer getReadOnlyBodyBuffer() {
+      checkEncode();
       internalWritableBuffer();
       return new ChannelBufferWrapper(buffer.slice(BODY_OFFSET, endOfBodyPosition - BUFFER_HEADER_SPACE).setIndex(0,
endOfBodyPosition - BUFFER_HEADER_SPACE).asReadOnly());
    }
@@ -243,6 +244,7 @@ public class CoreMessage extends RefCountMessage {
 
    @Override
    public Message copy() {
+      checkEncode();
       return new CoreMessage(this);
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6e9c0eae/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
index e619eb9..67ef258 100644
--- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
@@ -32,12 +32,15 @@ import org.apache.activemq.artemis.core.message.impl.CoreMessage;
 import org.apache.activemq.artemis.core.server.Queue;
 import org.apache.activemq.artemis.core.server.ServerConsumer;
 import org.apache.activemq.artemis.core.transaction.Transaction;
+import org.jboss.logging.Logger;
 
 /**
  * Handles MQTT Exactly Once (QoS level 2) Protocol.
  */
 public class MQTTPublishManager {
 
+   private static final Logger logger = Logger.getLogger(MQTTPublishManager.class);
+
    private static final String MANAGEMENT_QUEUE_PREFIX = "$sys.mqtt.queue.qos2.";
 
    private SimpleString managementAddress;
@@ -173,6 +176,7 @@ public class MQTTPublishManager {
                }
                tx.commit();
             } catch (Throwable t) {
+               logger.warn(t.getMessage(), t);
                tx.rollback();
                throw t;
             }
@@ -253,17 +257,17 @@ public class MQTTPublishManager {
       switch (message.getType()) {
          case Message.TEXT_TYPE:
             try {
-               SimpleString text = message.getBodyBuffer().readNullableSimpleString();
+               SimpleString text = message.getReadOnlyBodyBuffer().readNullableSimpleString();
                byte[] stringPayload = text.toString().getBytes("UTF-8");
                payload = ByteBufAllocator.DEFAULT.buffer(stringPayload.length);
                payload.writeBytes(stringPayload);
                break;
             } catch (UnsupportedEncodingException e) {
-               log.warn("Unable to send message: " + message.getMessageID() + " Cause: "
+ e.getMessage());
+               log.warn("Unable to send message: " + message.getMessageID() + " Cause: "
+ e.getMessage(), e);
             }
          default:
             ActiveMQBuffer bufferDup = message.getReadOnlyBodyBuffer();
-            payload = bufferDup.readBytes(message.getEndOfBodyPosition() - bufferDup.readerIndex()).byteBuf();
+            payload = bufferDup.readBytes(bufferDup.writerIndex()).byteBuf();
             break;
       }
       session.getProtocolHandler().send(messageId, address, qos, payload, deliveryCount);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6e9c0eae/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java
index b997d80..a5b908f 100644
--- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java
@@ -51,7 +51,7 @@ public class MQTTSessionCallback implements SessionCallback {
       try {
          session.getMqttPublishManager().sendMessage((CoreMessage)message, consumer, deliveryCount);
       } catch (Exception e) {
-         log.warn("Unable to send message: " + message.getMessageID() + " Cause: " + e.getMessage());
+         log.warn("Unable to send message: " + message.getMessageID() + " Cause: " + e.getMessage(),
e);
       }
       return 1;
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6e9c0eae/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java
index 6891497..e7b8c50 100644
--- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java
@@ -26,7 +26,6 @@ import io.netty.handler.codec.mqtt.MqttSubscribeMessage;
 import io.netty.handler.codec.mqtt.MqttTopicSubscription;
 import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.SimpleString;
-import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper;
 import org.apache.activemq.artemis.core.config.WildcardConfiguration;
 import org.apache.activemq.artemis.core.message.impl.CoreMessage;
 
@@ -114,8 +113,7 @@ public class MQTTUtil {
       String coreAddress = convertMQTTAddressFilterToCore(topic, session.getWildcardConfiguration());
       Message message = createServerMessage(session, new SimpleString(coreAddress), retain,
qos);
 
-      // FIXME does this involve a copy?
-      message.getBodyBuffer().writeBytes(new ChannelBufferWrapper(payload), payload.readableBytes());
+      message.getBodyBuffer().writeBytes(payload, 0, payload.readableBytes());
       return message;
    }
 


Mime
View raw message