activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From clebertsuco...@apache.org
Subject activemq-artemis git commit: fixing stomp
Date Fri, 03 Mar 2017 15:27:32 GMT
Repository: activemq-artemis
Updated Branches:
  refs/heads/artemis-1009 8805118c3 -> b2e00086a


fixing stomp


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/b2e00086
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/b2e00086
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/b2e00086

Branch: refs/heads/artemis-1009
Commit: b2e00086af4c369dcffb6602166ab2e974906981
Parents: 8805118
Author: Clebert Suconic <clebertsuconic@apache.org>
Authored: Fri Mar 3 10:27:12 2017 -0500
Committer: Clebert Suconic <clebertsuconic@apache.org>
Committed: Fri Mar 3 10:27:12 2017 -0500

----------------------------------------------------------------------
 .../core/protocol/stomp/StompConnection.java    |  3 +-
 .../core/protocol/stomp/StompSession.java       | 34 +++++++++++---------
 .../stomp/VersionedStompFrameHandler.java       |  8 ++---
 .../stomp/v12/StompFrameHandlerV12.java         |  4 ++-
 4 files changed, 27 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b2e00086/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java
b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java
index d0dff4d..56067f1 100644
--- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java
+++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java
@@ -727,9 +727,10 @@ public final class StompConnection implements RemotingConnection {
    }
 
    public StompFrame createStompMessage(ICoreMessage serverMessage,
+                                        ActiveMQBuffer bodyBuffer,
                                         StompSubscription subscription,
                                         int deliveryCount) throws Exception {
-      return frameHandler.createMessageFrame(serverMessage, subscription, deliveryCount);
+      return frameHandler.createMessageFrame(serverMessage, bodyBuffer, subscription, deliveryCount);
    }
 
    public void addStompEventListener(FrameEventListener listener) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b2e00086/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java
b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java
index ba706e5..c0e400c 100644
--- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java
+++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java
@@ -25,12 +25,16 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.LinkedBlockingDeque;
 import java.util.zip.Inflater;
 
+import io.netty.buffer.Unpooled;
+import io.netty.buffer.UnpooledByteBufAllocator;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
 import org.apache.activemq.artemis.api.core.ICoreMessage;
 import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.Pair;
 import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper;
 import org.apache.activemq.artemis.core.message.LargeBodyEncoder;
 import org.apache.activemq.artemis.core.message.impl.CoreMessage;
 import org.apache.activemq.artemis.core.persistence.OperationContext;
@@ -130,32 +134,31 @@ public class StompSession implements SessionCallback {
                           final ServerConsumer consumer,
                           int deliveryCount) {
 
-      //TODO-now: fix encoders
+      ICoreMessage  coreMessage = serverMessage.toCore();
+
       LargeServerMessageImpl largeMessage = null;
       ICoreMessage newServerMessage = serverMessage.toCore();
       try {
          StompSubscription subscription = subscriptions.get(consumer.getID());
-         StompFrame frame = null;
-         if (serverMessage.isLargeMessage()) {
+         StompFrame frame;
+         ActiveMQBuffer buffer;
 
-            largeMessage = (LargeServerMessageImpl) serverMessage;
-            LargeBodyEncoder encoder = largeMessage.getBodyEncoder();
+         if (coreMessage.isLargeMessage()) {
+            LargeBodyEncoder encoder = coreMessage.getBodyEncoder();
             encoder.open();
             int bodySize = (int) encoder.getLargeBodySize();
 
-            // TODO-now: Convert large mesasge body into the stomp message
-            //large message doesn't have a body.
-            // ((Message) newServerMessage).createBody(bodySize);
-//            encoder.encode(((ServerMessage)newServerMessage).getBodyBuffer(), bodySize);
-//            encoder.close();
+            buffer = new ChannelBufferWrapper(UnpooledByteBufAllocator.DEFAULT.heapBuffer(bodySize));
 
-            throw new RuntimeException("Large message body won't work with stomp now");
+            encoder.encode(buffer, bodySize);
+            encoder.close();
+         } else {
+            buffer = coreMessage.getReadOnlyBodyBuffer();
          }
 
          if (serverMessage.getBooleanProperty(Message.HDR_LARGE_COMPRESSED)) {
-            //decompress
-            ActiveMQBuffer qbuff = newServerMessage.getBodyBuffer();
-            int bytesToRead = qbuff.writerIndex() - CoreMessage.BODY_OFFSET;
+            ActiveMQBuffer qbuff = buffer;
+            int bytesToRead = qbuff.readerIndex();
             Inflater inflater = new Inflater();
             inflater.setInput(ByteUtil.getActiveArray(qbuff.readBytes(bytesToRead).toByteBuffer()));
 
@@ -168,9 +171,10 @@ public class StompSession implements SessionCallback {
             qbuff.resetReaderIndex();
             qbuff.resetWriterIndex();
             qbuff.writeBytes(data);
+            buffer = qbuff;
          }
 
-         frame = connection.createStompMessage(newServerMessage, subscription, deliveryCount);
+         frame = connection.createStompMessage(newServerMessage, buffer, subscription, deliveryCount);
 
          int length = frame.getEncodedSize();
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b2e00086/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java
b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java
index 1e40d42..ee2682b 100644
--- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java
+++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java
@@ -289,6 +289,7 @@ public abstract class VersionedStompFrameHandler {
    }
 
    public StompFrame createMessageFrame(ICoreMessage serverMessage,
+                                        ActiveMQBuffer bodyBuffer,
                                         StompSubscription subscription,
                                         int deliveryCount) throws Exception {
       StompFrame frame = createStompFrame(Stomp.Responses.MESSAGE);
@@ -297,14 +298,11 @@ public abstract class VersionedStompFrameHandler {
          frame.addHeader(Stomp.Headers.Message.SUBSCRIPTION, subscription.getID());
       }
 
-      // TODO-now fix encoders
-      ActiveMQBuffer buffer = serverMessage.getReadOnlyBodyBuffer();
+      ActiveMQBuffer buffer = bodyBuffer != null ? bodyBuffer : serverMessage.getReadOnlyBodyBuffer();
 
       int bodyPos = (serverMessage).getEndOfBodyPosition() == -1 ? buffer.writerIndex() :
(serverMessage).getEndOfBodyPosition();
 
-      buffer.readerIndex(CoreMessage.BUFFER_HEADER_SPACE + DataConstants.SIZE_INT);
-
-      int size = bodyPos - buffer.readerIndex();
+      int size = buffer.writerIndex();
 
       byte[] data = new byte[size];
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b2e00086/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/v12/StompFrameHandlerV12.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/v12/StompFrameHandlerV12.java
b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/v12/StompFrameHandlerV12.java
index 58d18ef..77a9225 100644
--- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/v12/StompFrameHandlerV12.java
+++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/v12/StompFrameHandlerV12.java
@@ -18,6 +18,7 @@ package org.apache.activemq.artemis.core.protocol.stomp.v12;
 
 import java.util.concurrent.ScheduledExecutorService;
 
+import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.ICoreMessage;
 import org.apache.activemq.artemis.core.protocol.stomp.ActiveMQStompException;
 import org.apache.activemq.artemis.core.protocol.stomp.Stomp;
@@ -49,9 +50,10 @@ public class StompFrameHandlerV12 extends StompFrameHandlerV11 {
 
    @Override
    public StompFrame createMessageFrame(ICoreMessage serverMessage,
+                                        ActiveMQBuffer bodyBuffer,
                                         StompSubscription subscription,
                                         int deliveryCount) throws Exception {
-      StompFrame frame = super.createMessageFrame(serverMessage, subscription, deliveryCount);
+      StompFrame frame = super.createMessageFrame(serverMessage, bodyBuffer, subscription,
deliveryCount);
 
       if (!subscription.getAck().equals(Stomp.Headers.Subscribe.AckModeValues.AUTO)) {
          frame.addHeader(Stomp.Headers.Message.ACK, String.valueOf(serverMessage.getMessageID()));


Mime
View raw message