activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hadr...@apache.org
Subject [02/19] git commit: AMQ-5042
Date Thu, 20 Mar 2014 20:15:32 GMT
AMQ-5042


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/acc62253
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/acc62253
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/acc62253

Branch: refs/heads/activemq-5.9
Commit: acc62253013fddcd3a60943684cc668683404173
Parents: afe0f18
Author: Hadrian Zbarcea <hadrian@apache.org>
Authored: Thu Mar 20 15:04:49 2014 -0400
Committer: Hadrian Zbarcea <hadrian@apache.org>
Committed: Thu Mar 20 15:04:49 2014 -0400

----------------------------------------------------------------------
 .../transport/amqp/AmqpNioTransport.java        | 28 +++++++++++++++--
 .../activemq/transport/amqp/JMSClientTest.java  | 32 +++++++++++++++++---
 2 files changed, 53 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/acc62253/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioTransport.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioTransport.java
b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioTransport.java
index 665bf88..dfb5f60 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioTransport.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioTransport.java
@@ -16,6 +16,7 @@
  */
 package org.apache.activemq.transport.amqp;
 
+import java.io.ByteArrayInputStream;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.EOFException;
@@ -29,7 +30,6 @@ import java.nio.channels.SocketChannel;
 
 import javax.net.SocketFactory;
 
-import org.apache.activemq.transport.nio.NIOInputStream;
 import org.apache.activemq.transport.nio.NIOOutputStream;
 import org.apache.activemq.transport.nio.SelectorManager;
 import org.apache.activemq.transport.nio.SelectorSelection;
@@ -38,11 +38,15 @@ import org.apache.activemq.util.IOExceptionSupport;
 import org.apache.activemq.util.ServiceStopper;
 import org.apache.activemq.wireformat.WireFormat;
 import org.fusesource.hawtbuf.Buffer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * An implementation of the {@link org.apache.activemq.transport.Transport} interface for
using AMQP over NIO
  */
 public class AmqpNioTransport extends TcpTransport {
+    private DataInputStream amqpHeaderValue = new DataInputStream(new ByteArrayInputStream(new
byte[]{'A', 'M', 'Q', 'P'}));
+    private final Integer AMQP_HEADER_VALUE = amqpHeaderValue.readInt();
 
     private SocketChannel channel;
     private SelectorSelection selection;
@@ -120,10 +124,28 @@ public class AmqpNioTransport extends TcpTransport {
                     }
                 }
 
-                doConsume(AmqpSupport.toBuffer(inputBuffer));
+                while(inputBuffer.position() < inputBuffer.limit()) {
+                    inputBuffer.mark();
+                    int commandSize = inputBuffer.getInt();
+                    inputBuffer.reset();
+
+                    // handles buffers starting with 'A','M','Q','P' rather than size
+                    if (commandSize == AMQP_HEADER_VALUE) {
+                        doConsume(AmqpSupport.toBuffer(inputBuffer));
+                        break;
+                    }
+
+                    byte[] bytes = new byte[commandSize];
+                    ByteBuffer commandBuffer = ByteBuffer.allocate(commandSize);
+                    inputBuffer.get(bytes, 0, commandSize);
+                    commandBuffer.put(bytes);
+                    commandBuffer.flip();
+                    doConsume(AmqpSupport.toBuffer(commandBuffer));
+                    commandBuffer.clear();
+                }
+
                 // clear the buffer
                 inputBuffer.clear();
-
             }
         } catch (IOException e) {
             onException(e);

http://git-wip-us.apache.org/repos/asf/activemq/blob/acc62253/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java
b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java
index 69237db..03bc8e3 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java
@@ -1,4 +1,4 @@
-/**
+/**                           >>>>>> pumping
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -47,14 +47,36 @@ import org.apache.activemq.broker.jmx.QueueViewMBean;
 import org.apache.activemq.transport.amqp.joram.ActiveMQAdmin;
 import org.apache.activemq.util.Wait;
 import org.apache.qpid.amqp_1_0.jms.impl.ConnectionFactoryImpl;
+
+import org.junit.After;
+import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TestName;
 import org.objectweb.jtests.jms.framework.TestConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class JMSClientTest extends AmqpTestSupport {
-
+    protected static final Logger LOG = LoggerFactory.getLogger(JMSClientTest.class);
     @Rule public TestName name = new TestName();
+    java.util.logging.Logger frameLoggger = java.util.logging.Logger.getLogger("FRM");
+
+
+    @Override
+    @Before
+    public void setUp() throws Exception {
+        LOG.debug("in setUp of {}", name.getMethodName());
+        super.setUp();
+    }
+
+    @Override
+    @After
+    public void tearDown() throws Exception {
+        LOG.debug("in tearDown of {}", name.getMethodName());
+        super.tearDown();
+        Thread.sleep(500);
+    }
 
     @SuppressWarnings("rawtypes")
     @Test
@@ -161,7 +183,7 @@ public class JMSClientTest extends AmqpTestSupport {
         connection.close();
     }
 
-    @Test
+    @Test(timeout=60000)
     public void testTXConsumerAndLargeNumberOfMessages() throws Exception {
 
         ActiveMQAdmin.enableJMSFrameTracing();
@@ -744,7 +766,9 @@ public class JMSClientTest extends AmqpTestSupport {
 
     private Connection createConnection(String clientId, boolean syncPublish) throws JMSException
{
 
-        final ConnectionFactoryImpl factory = new ConnectionFactoryImpl("localhost", port,
"admin", "password");
+        int brokerPort = getBrokerPort();
+        LOG.debug("Creating connection on port {}", brokerPort);
+        final ConnectionFactoryImpl factory = new ConnectionFactoryImpl("localhost", brokerPort,
"admin", "password");
 
         factory.setSyncPublish(syncPublish);
         factory.setTopicPrefix("topic://");


Mime
View raw message