activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kea...@apache.org
Subject git commit: Fix for AMQ-5093. amqp+nio and amqp+nio+ssl were failing on large messages
Date Tue, 11 Mar 2014 11:00:33 GMT
Repository: activemq
Updated Branches:
  refs/heads/trunk 570dbb437 -> 87420cc45


Fix for AMQ-5093. amqp+nio and amqp+nio+ssl were failing on large messages


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/87420cc4
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/87420cc4
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/87420cc4

Branch: refs/heads/trunk
Commit: 87420cc455f6e02d5249bfe29b105f4e1e345969
Parents: 570dbb4
Author: Kevin Earls <kevin@kevinearls.com>
Authored: Tue Mar 11 12:00:16 2014 +0100
Committer: Kevin Earls <kevin@kevinearls.com>
Committed: Tue Mar 11 12:00:16 2014 +0100

----------------------------------------------------------------------
 .../transport/amqp/AmqpNioSslTransport.java     | 137 +--------------
 .../transport/amqp/AmqpNioTransport.java        |  64 ++-----
 .../transport/amqp/AmqpNioTransportHelper.java  | 175 +++++++++++++++++++
 .../activemq/transport/amqp/JMSClientTest.java  |  36 ++++
 .../transport/amqp/joram/JoramJmsNioTest.java   |   2 +
 5 files changed, 229 insertions(+), 185 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/87420cc4/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioSslTransport.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioSslTransport.java
b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioSslTransport.java
index 76e6f64..c569f05 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioSslTransport.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioSslTransport.java
@@ -18,13 +18,8 @@ package org.apache.activemq.transport.amqp;
 
 import org.apache.activemq.transport.nio.NIOSSLTransport;
 import org.apache.activemq.wireformat.WireFormat;
-import org.fusesource.hawtbuf.Buffer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import javax.net.SocketFactory;
-import java.io.ByteArrayInputStream;
-import java.io.DataInputStream;
 import java.io.IOException;
 import java.net.Socket;
 import java.net.URI;
@@ -32,10 +27,7 @@ import java.net.UnknownHostException;
 import java.nio.ByteBuffer;
 
 public class AmqpNioSslTransport extends NIOSSLTransport {
-    private DataInputStream amqpHeaderValue = new DataInputStream(new ByteArrayInputStream(new
byte[]{'A', 'M', 'Q', 'P'}));
-    public final Integer AMQP_HEADER_VALUE = amqpHeaderValue.readInt();
-    private static final Logger LOG = LoggerFactory.getLogger(AmqpNioSslTransport.class);
-    private boolean magicConsumed = false;
+    private AmqpNioTransportHelper amqpNioTransportHelper = new AmqpNioTransportHelper(this);
 
     public AmqpNioSslTransport(WireFormat wireFormat, SocketFactory socketFactory, URI remoteLocation,
URI localLocation) throws UnknownHostException, IOException {
         super(wireFormat, socketFactory, remoteLocation, localLocation);
@@ -55,131 +47,6 @@ public class AmqpNioSslTransport extends NIOSSLTransport {
 
     @Override
     protected void processCommand(ByteBuffer plain) throws Exception {
-        // Are we waiting for the next Command or are we building on the current one?  The
-        // frame size is in the first 4 bytes.
-        if (nextFrameSize == -1) {
-            // We can get small packets that don't give us enough for the frame size
-            // so allocate enough for the initial size value and
-            if (plain.remaining() < 4) {
-                if (currentBuffer == null) {
-                    currentBuffer = ByteBuffer.allocate(4);
-                }
-
-                // Go until we fill the integer sized current buffer.
-                while (currentBuffer.hasRemaining() && plain.hasRemaining()) {
-                    currentBuffer.put(plain.get());
-                }
-
-                // Didn't we get enough yet to figure out next frame size.
-                if (currentBuffer.hasRemaining()) {
-                    return;
-                } else {
-                    currentBuffer.flip();
-                    nextFrameSize = currentBuffer.getInt();
-                }
-            } else {
-                // Either we are completing a previous read of the next frame size or its
-                // fully contained in plain already.
-                if (currentBuffer != null) {
-                    // Finish the frame size integer read and get from the current buffer.
-                    while (currentBuffer.hasRemaining()) {
-                        currentBuffer.put(plain.get());
-                    }
-
-                    currentBuffer.flip();
-                    nextFrameSize = currentBuffer.getInt();
-                } else {
-                    nextFrameSize = plain.getInt();
-                }
-            }
-        }
-
-        // There are three possibilities when we get here.  We could have a partial frame,
-        // a full frame, or more than 1 frame
-        while (true) {
-            LOG.debug("Entering while loop with plain.position {} remaining {} ", plain.position(),
plain.remaining());
-            // handle headers, which start with 'A','M','Q','P' rather than size
-            if (nextFrameSize == AMQP_HEADER_VALUE) {
-                nextFrameSize = handleAmqpHeader(plain);
-                if (nextFrameSize == -1) {
-                    return;
-                }
-            }
-
-            validateFrameSize(nextFrameSize);
-
-            // now we have the data, let's reallocate and try to fill it,  (currentBuffer.putInt()
is called
-            // because we need to put back the 4 bytes we read to determine the size)
-            currentBuffer = ByteBuffer.allocate(nextFrameSize );
-            currentBuffer.putInt(nextFrameSize);
-            if (currentBuffer.remaining() >= plain.remaining()) {
-                currentBuffer.put(plain);
-            } else {
-                byte[] fill = new byte[currentBuffer.remaining()];
-                plain.get(fill);
-                currentBuffer.put(fill);
-            }
-
-            // Either we have enough data for a new command or we have to wait for some more.
 If hasRemaining is true,
-            // we have not filled the buffer yet, i.e. we haven't received the full frame.
-            if (currentBuffer.hasRemaining()) {
-                return;
-            } else {
-                currentBuffer.flip();
-                LOG.debug("Calling doConsume with position {} limit {}", currentBuffer.position(),
currentBuffer.limit());
-                doConsume(AmqpSupport.toBuffer(currentBuffer));
-
-                // Determine if there are more frames to process
-                if (plain.hasRemaining()) {
-                    if (plain.remaining() < 4) {
-                        nextFrameSize = 4;
-                    } else {
-                        nextFrameSize = plain.getInt();
-                    }
-                } else {
-                    nextFrameSize = -1;
-                    currentBuffer = null;
-                    return;
-                }
-            }
-        }
+        amqpNioTransportHelper.processCommand(plain);
     }
-
-    private void validateFrameSize(int frameSize) throws IOException {
-        if (nextFrameSize > AmqpWireFormat.DEFAULT_MAX_FRAME_SIZE) {
-            throw new IOException("Frame size of " + nextFrameSize +
-                    "larger than max allowed " + AmqpWireFormat.DEFAULT_MAX_FRAME_SIZE);
-        }
-    }
-
-    private int handleAmqpHeader(ByteBuffer plain) {
-        int nextFrameSize;
-
-        LOG.debug("Consuming AMQP_HEADER");
-        currentBuffer = ByteBuffer.allocate(8);
-        currentBuffer.putInt(AMQP_HEADER_VALUE);
-        while (currentBuffer.hasRemaining()) {
-            currentBuffer.put(plain.get());
-        }
-        currentBuffer.flip();
-        if (!magicConsumed) {   // The first case we see is special and has to be handled
differently
-            doConsume(new AmqpHeader(new Buffer(currentBuffer)));
-            magicConsumed = true;
-        } else {
-            doConsume(AmqpSupport.toBuffer(currentBuffer));
-        }
-
-        if (plain.hasRemaining()) {
-            if (plain.remaining() < 4) {
-                nextFrameSize = 4;
-            } else {
-                nextFrameSize = plain.getInt();
-            }
-        } else {
-            nextFrameSize = -1;
-            currentBuffer = null;
-        }
-        return nextFrameSize;
-    }
-
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/activemq/blob/87420cc4/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioTransport.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioTransport.java
b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioTransport.java
index e94cb0b..ee2694c 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioTransport.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioTransport.java
@@ -16,8 +16,17 @@
  */
 package org.apache.activemq.transport.amqp;
 
-import java.io.ByteArrayInputStream;
-import java.io.DataInputStream;
+import org.apache.activemq.transport.nio.NIOOutputStream;
+import org.apache.activemq.transport.nio.SelectorManager;
+import org.apache.activemq.transport.nio.SelectorSelection;
+import org.apache.activemq.transport.tcp.TcpTransport;
+import org.apache.activemq.util.IOExceptionSupport;
+import org.apache.activemq.util.ServiceStopper;
+import org.apache.activemq.wireformat.WireFormat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.net.SocketFactory;
 import java.io.DataOutputStream;
 import java.io.EOFException;
 import java.io.IOException;
@@ -28,26 +37,14 @@ import java.nio.ByteBuffer;
 import java.nio.channels.SelectionKey;
 import java.nio.channels.SocketChannel;
 
-import javax.net.SocketFactory;
-
-import org.apache.activemq.transport.nio.NIOOutputStream;
-import org.apache.activemq.transport.nio.SelectorManager;
-import org.apache.activemq.transport.nio.SelectorSelection;
-import org.apache.activemq.transport.tcp.TcpTransport;
-import org.apache.activemq.util.IOExceptionSupport;
-import org.apache.activemq.util.ServiceStopper;
-import org.apache.activemq.wireformat.WireFormat;
-import org.fusesource.hawtbuf.Buffer;
-
 /**
  * An implementation of the {@link org.apache.activemq.transport.Transport} interface for
using AMQP over NIO
  */
 public class AmqpNioTransport extends TcpTransport {
-    private final DataInputStream amqpHeaderValue = new DataInputStream(new ByteArrayInputStream(new
byte[]{'A', 'M', 'Q', 'P'}));
-    private final Integer AMQP_HEADER_VALUE = amqpHeaderValue.readInt();
-
+    private static final Logger LOG = LoggerFactory.getLogger(AmqpNioTransport.class);
     private SocketChannel channel;
     private SelectorSelection selection;
+    private AmqpNioTransportHelper amqpNioTransportHelper = new AmqpNioTransportHelper(this);
 
     private ByteBuffer inputBuffer;
 
@@ -110,40 +107,7 @@ public class AmqpNioTransport extends TcpTransport {
                 receiveCounter += readSize;
 
                 inputBuffer.flip();
-
-                if( !magicRead ) {
-                    if( inputBuffer.remaining()>= 8 ) {
-                        magicRead = true;
-                        Buffer magic = new Buffer(8);
-                        for (int i = 0; i < 8; i++) {
-                            magic.data[i] = inputBuffer.get();
-                        }
-                        doConsume(new AmqpHeader(magic));
-                    } else {
-                        inputBuffer.flip();
-                        continue;
-                    }
-                }
-
-                while(inputBuffer.position() < inputBuffer.limit()) {
-                    inputBuffer.mark();
-                    int commandSize = inputBuffer.getInt();
-                    inputBuffer.reset();
-
-                    // handles buffers starting with 'A','M','Q','P' rather than size
-                    if (commandSize == AMQP_HEADER_VALUE) {
-                        doConsume(AmqpSupport.toBuffer(inputBuffer));
-                        break;
-                    }
-
-                    byte[] bytes = new byte[commandSize];
-                    ByteBuffer commandBuffer = ByteBuffer.allocate(commandSize);
-                    inputBuffer.get(bytes, 0, commandSize);
-                    commandBuffer.put(bytes);
-                    commandBuffer.flip();
-                    doConsume(AmqpSupport.toBuffer(commandBuffer));
-                    commandBuffer.clear();
-                }
+                amqpNioTransportHelper.processCommand(inputBuffer);
 
                 // clear the buffer
                 inputBuffer.clear();

http://git-wip-us.apache.org/repos/asf/activemq/blob/87420cc4/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioTransportHelper.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioTransportHelper.java
b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioTransportHelper.java
new file mode 100644
index 0000000..09cab5d
--- /dev/null
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioTransportHelper.java
@@ -0,0 +1,175 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp;
+
+import org.apache.activemq.transport.TransportSupport;
+import org.fusesource.hawtbuf.Buffer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+public class AmqpNioTransportHelper {
+    private final DataInputStream amqpHeaderValue = new DataInputStream(new ByteArrayInputStream(new
byte[]{'A', 'M', 'Q', 'P'}));
+    private final Integer AMQP_HEADER_VALUE;
+    private static final Logger LOG = LoggerFactory.getLogger(AmqpNioTransportHelper.class);
+    protected int nextFrameSize = -1;
+    protected ByteBuffer currentBuffer;
+    private boolean magicConsumed = false;
+    private TransportSupport transportSupport;
+
+    public AmqpNioTransportHelper(TransportSupport transportSupport) throws IOException {
+        AMQP_HEADER_VALUE = amqpHeaderValue.readInt();
+        this.transportSupport = transportSupport;
+    }
+
+    protected void processCommand(ByteBuffer plain) throws Exception {
+        // Are we waiting for the next Command or building on the current one?  The frame
size is in the first 4 bytes.
+        if (nextFrameSize == -1) {
+            // We can get small packets that don't give us enough for the frame size
+            // so allocate enough for the initial size value and
+            if (plain.remaining() < 4) {
+                if (currentBuffer == null) {
+                    currentBuffer = ByteBuffer.allocate(4);
+                }
+
+                // Go until we fill the integer sized current buffer.
+                while (currentBuffer.hasRemaining() && plain.hasRemaining()) {
+                    currentBuffer.put(plain.get());
+                }
+
+                // Didn't we get enough yet to figure out next frame size.
+                if (currentBuffer.hasRemaining()) {
+                    return;
+                } else {
+                    currentBuffer.flip();
+                    nextFrameSize = currentBuffer.getInt();
+                }
+            } else {
+                // Either we are completing a previous read of the next frame size or its
+                // fully contained in plain already.
+                if (currentBuffer != null) {
+                    // Finish the frame size integer read and get from the current buffer.
+                    while (currentBuffer.hasRemaining()) {
+                        currentBuffer.put(plain.get());
+                    }
+
+                    currentBuffer.flip();
+                    nextFrameSize = currentBuffer.getInt();
+                } else {
+                    nextFrameSize = plain.getInt();
+                }
+            }
+        }
+
+        // There are three possibilities when we get here.  We could have a partial frame,
+        // a full frame, or more than 1 frame
+        while (true) {
+            // handle headers, which start with 'A','M','Q','P' rather than size
+            if (nextFrameSize == AMQP_HEADER_VALUE) {
+                nextFrameSize = handleAmqpHeader(plain);
+                if (nextFrameSize == -1) {
+                    return;
+                }
+            }
+            validateFrameSize(nextFrameSize);
+
+            // now we have the data, let's reallocate and try to fill it,  (currentBuffer.putInt()
is called      TODO update
+            // because we need to put back the 4 bytes we read to determine the size)
+            if (currentBuffer == null || (currentBuffer.limit() == 4)) {
+                currentBuffer = ByteBuffer.allocate(nextFrameSize);
+                currentBuffer.putInt(nextFrameSize);
+            }
+
+            if (currentBuffer.remaining() >= plain.remaining()) {
+                currentBuffer.put(plain);
+            } else {
+                byte[] fill = new byte[currentBuffer.remaining()];
+                plain.get(fill);
+                currentBuffer.put(fill);
+            }
+
+            // Either we have enough data for a new command or we have to wait for some more.
 If hasRemaining is true,
+            // we have not filled the buffer yet, i.e. we haven't received the full frame.
+            if (currentBuffer.hasRemaining()) {
+                return;
+            } else {
+                currentBuffer.flip();
+                LOG.debug("Calling doConsume with position {} limit {}", currentBuffer.position(),
currentBuffer.limit());
+                transportSupport.doConsume(AmqpSupport.toBuffer(currentBuffer));
+                currentBuffer = null;
+                nextFrameSize = -1;
+
+                // Determine if there are more frames to process
+                if (plain.hasRemaining()) {
+                    if (plain.remaining() < 4) {
+                        currentBuffer = ByteBuffer.allocate(4);
+                        while (currentBuffer.hasRemaining() && plain.hasRemaining())
{
+                            currentBuffer.put(plain.get());
+                        }
+                        return;
+                    } else {
+                        nextFrameSize = plain.getInt();
+                    }
+                } else {
+                    return;
+                }
+            }
+        }
+    }
+
+    private void validateFrameSize(int frameSize) throws IOException {
+        if (nextFrameSize > AmqpWireFormat.DEFAULT_MAX_FRAME_SIZE) {
+            throw new IOException("Frame size of " + nextFrameSize +
+                    "larger than max allowed " + AmqpWireFormat.DEFAULT_MAX_FRAME_SIZE);
+        }
+    }
+
+    private int handleAmqpHeader(ByteBuffer plain) {
+        int nextFrameSize;
+
+        LOG.debug("Consuming AMQP_HEADER");
+        currentBuffer = ByteBuffer.allocate(8);
+        currentBuffer.putInt(AMQP_HEADER_VALUE);
+        while (currentBuffer.hasRemaining()) {
+            currentBuffer.put(plain.get());
+        }
+        currentBuffer.flip();
+        if (!magicConsumed) {   // The first case we see is special and has to be handled
differently
+            transportSupport.doConsume(new AmqpHeader(new Buffer(currentBuffer)));
+            magicConsumed = true;
+        } else {
+            transportSupport.doConsume(AmqpSupport.toBuffer(currentBuffer));
+        }
+
+        if (plain.hasRemaining()) {
+            if (plain.remaining() < 4) {
+                nextFrameSize = 4;
+            } else {
+                nextFrameSize = plain.getInt();
+            }
+        } else {
+            nextFrameSize = -1;
+            currentBuffer = null;
+        }
+        return nextFrameSize;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/87420cc4/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java
b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java
index 6239904..7ae8ef7 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java
@@ -784,6 +784,42 @@ public class JMSClientTest extends AmqpTestSupport {
         connection.close();
     }
 
+    private String createLargeString(int sizeInBytes) {
+        byte[] base = {1, 2, 3, 4, 5, 6, 7, 8, 9, 0};
+        StringBuilder builder = new StringBuilder();
+        for (int i = 0; i < sizeInBytes; i++) {
+            builder.append(base[i % base.length]);
+        }
+
+        LOG.debug("Created string with size : " + builder.toString().getBytes().length +
" bytes");
+        return builder.toString();
+    }
+
+    @Test(timeout = 60 * 1000)
+    public void testSendLargeMessage() throws JMSException, InterruptedException {
+        Connection connection = createConnection();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        String queueName = name.toString();
+        Queue queue = session.createQueue(queueName);
+
+        MessageProducer producer=session.createProducer(queue);
+        int messageSize = 1024 * 1024;
+        String messageText = createLargeString(messageSize);
+        Message m=session.createTextMessage(messageText);
+        LOG.debug("Sending message of {} bytes on queue {}", messageSize, queueName);
+        producer.send(m);
+
+        MessageConsumer  consumer=session.createConsumer(queue);
+
+        Message message = consumer.receive();
+        assertNotNull(message);
+        assertTrue(message instanceof TextMessage);
+        TextMessage textMessage = (TextMessage) message;
+        LOG.debug(">>>> Received message of length {}", textMessage.getText().length());
+        assertEquals(messageSize, textMessage.getText().length());
+        assertEquals(messageText, textMessage.getText());
+    }
+
     private Connection createConnection() throws JMSException {
         return createConnection(name.toString(), false, false);
     }

http://git-wip-us.apache.org/repos/asf/activemq/blob/87420cc4/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/joram/JoramJmsNioTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/joram/JoramJmsNioTest.java
b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/joram/JoramJmsNioTest.java
index afb6ece..7c78d0f 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/joram/JoramJmsNioTest.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/joram/JoramJmsNioTest.java
@@ -17,6 +17,7 @@
 package org.apache.activemq.transport.amqp.joram;
 
 import org.junit.BeforeClass;
+import org.junit.Ignore;
 import org.junit.Rule;
 import org.junit.rules.Timeout;
 import org.junit.runner.RunWith;
@@ -41,6 +42,7 @@ import org.objectweb.jtests.jms.conform.topic.TemporaryTopicTest;
 /**
  * Run the JoramJmsTests using amqp+nio
  */
+@Ignore("AMQ-5094")
 @RunWith(Suite.class)
 @Suite.SuiteClasses({
     // TopicSessionTest.class,    // Hangs, see https://issues.apache.org/jira/browse/PROTON-154


Mime
View raw message