activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject git commit: https://issues.apache.org/jira/browse/AMQ-4753
Date Tue, 08 Oct 2013 21:54:00 GMT
Updated Branches:
  refs/heads/trunk dc0291b29 -> 0f9a34799


https://issues.apache.org/jira/browse/AMQ-4753

Quick fix for getting past protocol discrimination and passing on proper
Buffers of data to the protocol converter.

Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/0f9a3479
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/0f9a3479
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/0f9a3479

Branch: refs/heads/trunk
Commit: 0f9a34799630911b0d7906e166291ff5af0d852c
Parents: dc0291b
Author: Timothy Bish <tabish121@gmai.com>
Authored: Tue Oct 8 17:53:51 2013 -0400
Committer: Timothy Bish <tabish121@gmai.com>
Committed: Tue Oct 8 17:53:51 2013 -0400

----------------------------------------------------------------------
 .../transport/amqp/AMQPSslTransportFactory.java | 41 +++++-----
 .../transport/amqp/AmqpNioSslTransport.java     | 32 ++++++--
 .../transport/amqp/AmqpTestSupport.java         | 21 +++++-
 .../transport/amqp/bugs/AMQ4753Test.java        | 78 ++++++++++++++++++++
 4 files changed, 145 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/0f9a3479/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AMQPSslTransportFactory.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AMQPSslTransportFactory.java
b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AMQPSslTransportFactory.java
index bd341a5..0612fd9 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AMQPSslTransportFactory.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AMQPSslTransportFactory.java
@@ -16,6 +16,9 @@
  */
 package org.apache.activemq.transport.amqp;
 
+import java.util.HashMap;
+import java.util.Map;
+
 import org.apache.activemq.broker.BrokerContext;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.BrokerServiceAware;
@@ -25,9 +28,6 @@ import org.apache.activemq.transport.tcp.SslTransportFactory;
 import org.apache.activemq.util.IntrospectionSupport;
 import org.apache.activemq.wireformat.WireFormat;
 
-import java.util.HashMap;
-import java.util.Map;
-
 /**
  * A <a href="http://amqp.org/">AMQP</a> over SSL transport factory
  */
@@ -35,12 +35,13 @@ public class AMQPSslTransportFactory extends SslTransportFactory implements
Brok
 
     private BrokerContext brokerContext = null;
 
+    @Override
     protected String getDefaultWireFormatType() {
         return "amqp";
     }
 
+    @Override
     @SuppressWarnings("rawtypes")
-
     public Transport compositeConfigure(Transport transport, WireFormat format, Map options)
{
         transport = new AmqpTransportFilter(transport, format, brokerContext);
         IntrospectionSupport.setProperties(transport, options);
@@ -53,31 +54,33 @@ public class AMQPSslTransportFactory extends SslTransportFactory implements
Brok
         transport = super.serverConfigure(transport, format, options);
 
         // strip off the mutex transport.
-        if( transport instanceof MutexTransport ) {
-            transport = ((MutexTransport)transport).getNext();
+        if (transport instanceof MutexTransport) {
+            transport = ((MutexTransport) transport).getNext();
         }
 
-//        MutexTransport mutex = transport.narrow(MutexTransport.class);
-//        if (mutex != null) {
-//            mutex.setSyncOnCommand(true);
-//        }
+        // MutexTransport mutex = transport.narrow(MutexTransport.class);
+        // if (mutex != null) {
+        // mutex.setSyncOnCommand(true);
+        // }
 
         return transport;
     }
 
+    @Override
     public void setBrokerService(BrokerService brokerService) {
         this.brokerContext = brokerService.getBrokerContext();
     }
 
-//    protected Transport createInactivityMonitor(Transport transport, WireFormat format)
{
-//        AmqpInactivityMonitor monitor = new AmqpInactivityMonitor(transport, format);
-//
-//        AmqpTransportFilter filter = transport.narrow(AmqpTransportFilter.class);
-//        filter.setInactivityMonitor(monitor);
-//
-//        return monitor;
-//    }
-
+    // protected Transport createInactivityMonitor(Transport transport,
+    // WireFormat format) {
+    // AmqpInactivityMonitor monitor = new AmqpInactivityMonitor(transport,
+    // format);
+    //
+    // AmqpTransportFilter filter = transport.narrow(AmqpTransportFilter.class);
+    // filter.setInactivityMonitor(monitor);
+    //
+    // return monitor;
+    // }
 
     @Override
     protected boolean isUseInactivityMonitor(Transport transport) {

http://git-wip-us.apache.org/repos/asf/activemq/blob/0f9a3479/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioSslTransport.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioSslTransport.java
b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioSslTransport.java
index 9109244..d393aa7 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioSslTransport.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioSslTransport.java
@@ -16,18 +16,22 @@
  */
 package org.apache.activemq.transport.amqp;
 
-import org.apache.activemq.transport.nio.NIOSSLTransport;
-import org.apache.activemq.wireformat.WireFormat;
-
-import javax.net.SocketFactory;
 import java.io.IOException;
 import java.net.Socket;
 import java.net.URI;
 import java.net.UnknownHostException;
 import java.nio.ByteBuffer;
 
+import javax.net.SocketFactory;
+
+import org.apache.activemq.transport.nio.NIOSSLTransport;
+import org.apache.activemq.wireformat.WireFormat;
+import org.fusesource.hawtbuf.Buffer;
+
 public class AmqpNioSslTransport extends NIOSSLTransport {
 
+    private boolean magicRead;
+
     public AmqpNioSslTransport(WireFormat wireFormat, SocketFactory socketFactory, URI remoteLocation,
URI localLocation) throws UnknownHostException, IOException {
         super(wireFormat, socketFactory, remoteLocation, localLocation);
     }
@@ -46,7 +50,23 @@ public class AmqpNioSslTransport extends NIOSSLTransport {
 
     @Override
     protected void processCommand(ByteBuffer plain) throws Exception {
-        doConsume(AmqpSupport.toBuffer(plain));
-    }
 
+        byte[] fill = new byte[plain.remaining()];
+        plain.get(fill);
+
+        ByteBuffer payload = ByteBuffer.wrap(fill);
+
+        if (!magicRead) {
+            if (payload.remaining() >= 8) {
+                magicRead = true;
+                Buffer magic = new Buffer(8);
+                for (int i = 0; i < 8; i++) {
+                    magic.data[i] = payload.get();
+                }
+                doConsume(new AmqpHeader(magic));
+            }
+        }
+
+        doConsume(AmqpSupport.toBuffer(payload));
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/activemq/blob/0f9a3479/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTestSupport.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTestSupport.java
b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTestSupport.java
index 2284a8c..5c5bcff 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTestSupport.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTestSupport.java
@@ -17,6 +17,7 @@
 package org.apache.activemq.transport.amqp;
 
 import java.io.File;
+import java.security.SecureRandom;
 import java.util.Vector;
 
 import javax.jms.Connection;
@@ -27,6 +28,9 @@ import javax.jms.Session;
 import javax.jms.TextMessage;
 import javax.management.MalformedObjectNameException;
 import javax.management.ObjectName;
+import javax.net.ssl.KeyManager;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.TrustManager;
 
 import org.apache.activemq.AutoFailTestSupport;
 import org.apache.activemq.broker.BrokerService;
@@ -45,10 +49,11 @@ public class AmqpTestSupport {
     protected BrokerService brokerService;
     protected Vector<Throwable> exceptions = new Vector<Throwable>();
     protected int numberOfMessages;
-    AutoFailTestSupport autoFailTestSupport = new AutoFailTestSupport() {
-    };
+    AutoFailTestSupport autoFailTestSupport = new AutoFailTestSupport() {};
     protected int port;
     protected int sslPort;
+    protected int nioPort;
+    protected int nioPlusSslPort;
 
     public static void main(String[] args) throws Exception {
         final AmqpTestSupport s = new AmqpTestSupport();
@@ -72,6 +77,10 @@ public class AmqpTestSupport {
         brokerService.setPersistent(false);
         brokerService.setAdvisorySupport(false);
 
+        SSLContext ctx = SSLContext.getInstance("TLS");
+        ctx.init(new KeyManager[0], new TrustManager[]{new DefaultTrustManager()}, new SecureRandom());
+        SSLContext.setDefault(ctx);
+
         // Setup SSL context...
         final File classesDir = new File(AmqpProtocolConverter.class.getProtectionDomain().getCodeSource().getLocation().getFile());
         File keystore = new File(classesDir, "../../src/test/resources/keystore");
@@ -91,8 +100,16 @@ public class AmqpTestSupport {
     protected void addAMQPConnector() throws Exception {
         TransportConnector connector = brokerService.addConnector("amqp+ssl://0.0.0.0:" +
sslPort);
         sslPort = connector.getConnectUri().getPort();
+        LOG.debug("Using amqp+ssl port " + sslPort);
         connector = brokerService.addConnector("amqp://0.0.0.0:" + port);
         port = connector.getConnectUri().getPort();
+        LOG.debug("Using amqp port " + port);
+        connector = brokerService.addConnector("amqp+nio://0.0.0.0:" + nioPort);
+        nioPort = connector.getConnectUri().getPort();
+        LOG.debug("Using amqp+nio port " + nioPort);
+        connector = brokerService.addConnector("amqp+nio+ssl://0.0.0.0:" + nioPlusSslPort);
+        nioPlusSslPort = connector.getConnectUri().getPort();
+        LOG.debug("Using amqp+nio+ssl port " + nioPlusSslPort);
     }
 
     @After

http://git-wip-us.apache.org/repos/asf/activemq/blob/0f9a3479/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/bugs/AMQ4753Test.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/bugs/AMQ4753Test.java
b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/bugs/AMQ4753Test.java
new file mode 100644
index 0000000..3be3482
--- /dev/null
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/bugs/AMQ4753Test.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.bugs;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import javax.jms.Connection;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.activemq.transport.amqp.AmqpTestSupport;
+import org.apache.qpid.amqp_1_0.jms.impl.ConnectionFactoryImpl;
+import org.apache.qpid.amqp_1_0.jms.impl.QueueImpl;
+import org.junit.Test;
+
+public class AMQ4753Test extends AmqpTestSupport {
+
+    @Test(timeout = 120 * 1000)
+    public void testAmqpNioPlusSslSendReceive() throws JMSException{
+        Connection connection = createAMQPConnection(nioPlusSslPort, true);
+        runSimpleSendReceiveTest(connection);
+    }
+
+    public void runSimpleSendReceiveTest(Connection connection) throws JMSException{
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        QueueImpl queue = new QueueImpl("queue://txqueue");
+        MessageProducer producer = session.createProducer(queue);
+        TextMessage message = session.createTextMessage();
+        String messageText = "hello  sent at " + new java.util.Date().toString();
+        message.setText(messageText);
+        producer.send(message);
+
+        // Get the message we just sent
+        MessageConsumer consumer = session.createConsumer(queue);
+        connection.start();
+        Message receivedMessage = consumer.receive(5000);
+        assertNotNull(receivedMessage);
+        assertTrue(receivedMessage instanceof TextMessage);
+        TextMessage textMessage = (TextMessage) receivedMessage;
+        assertEquals(messageText, textMessage.getText());
+        connection.close();
+    }
+
+    private Connection createAMQPConnection(int testPort, boolean useSSL) throws JMSException
{
+        LOG.debug("In createConnection using port {} ssl? {}", testPort, useSSL);
+        final ConnectionFactoryImpl connectionFactory = new ConnectionFactoryImpl("localhost",
testPort, "admin", "password", null, useSSL);
+        final Connection connection = connectionFactory.createConnection();
+        connection.setExceptionListener(new ExceptionListener() {
+            @Override
+            public void onException(JMSException exception) {
+                exception.printStackTrace();
+            }
+        });
+        connection.start();
+        return connection;
+    }
+}


Mime
View raw message