activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject [1/3] activemq git commit: https://issues.apache.org/jira/browse/AMQ-6339
Date Thu, 30 Jun 2016 19:14:21 GMT
Repository: activemq
Updated Branches:
  refs/heads/master 83827f277 -> 31c55f751


http://git-wip-us.apache.org/repos/asf/activemq/blob/31c55f75/activemq-http/src/main/java/org/apache/activemq/transport/ws/WSTransportServer.java
----------------------------------------------------------------------
diff --git a/activemq-http/src/main/java/org/apache/activemq/transport/ws/WSTransportServer.java
b/activemq-http/src/main/java/org/apache/activemq/transport/ws/WSTransportServer.java
index 4110fcb..3029668 100644
--- a/activemq-http/src/main/java/org/apache/activemq/transport/ws/WSTransportServer.java
+++ b/activemq-http/src/main/java/org/apache/activemq/transport/ws/WSTransportServer.java
@@ -23,6 +23,8 @@ import java.util.Map;
 
 import javax.servlet.Servlet;
 
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.BrokerServiceAware;
 import org.apache.activemq.command.BrokerInfo;
 import org.apache.activemq.transport.SocketConnectorFactory;
 import org.apache.activemq.transport.WebTransportServerSupport;
@@ -41,10 +43,13 @@ import org.slf4j.LoggerFactory;
  * Creates a web server and registers web socket server
  *
  */
-public class WSTransportServer extends WebTransportServerSupport {
+public class WSTransportServer extends WebTransportServerSupport implements BrokerServiceAware
{
 
     private static final Logger LOG = LoggerFactory.getLogger(WSTransportServer.class);
 
+    private BrokerService brokerService;
+    private WSServlet servlet;
+
     public WSTransportServer(URI location) {
         super(location);
         this.bindAddress = location;
@@ -105,8 +110,10 @@ public class WSTransportServer extends WebTransportServerSupport {
     }
 
     private Servlet createWSServlet() throws Exception {
-        WSServlet servlet = new WSServlet();
+        servlet = new WSServlet();
         servlet.setTransportOptions(transportOptions);
+        servlet.setBrokerService(brokerService);
+
         return servlet;
     }
 
@@ -147,4 +154,12 @@ public class WSTransportServer extends WebTransportServerSupport {
     public boolean isSslServer() {
         return false;
     }
+
+    @Override
+    public void setBrokerService(BrokerService brokerService) {
+        this.brokerService = brokerService;
+        if (servlet != null) {
+            servlet.setBrokerService(brokerService);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/31c55f75/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty9/WSServlet.java
----------------------------------------------------------------------
diff --git a/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty9/WSServlet.java
b/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty9/WSServlet.java
index 338be98..1f7c5e7 100644
--- a/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty9/WSServlet.java
+++ b/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty9/WSServlet.java
@@ -18,17 +18,26 @@
 package org.apache.activemq.transport.ws.jetty9;
 
 import java.io.IOException;
-import java.util.*;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
 import javax.servlet.ServletException;
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
 
-import org.apache.activemq.jms.pool.IntrospectionSupport;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.BrokerServiceAware;
 import org.apache.activemq.transport.Transport;
 import org.apache.activemq.transport.TransportAcceptListener;
+import org.apache.activemq.transport.TransportFactory;
 import org.apache.activemq.transport.util.HttpTransportUtils;
+import org.apache.activemq.transport.ws.WSTransportProxy;
 import org.eclipse.jetty.websocket.api.WebSocketListener;
 import org.eclipse.jetty.websocket.servlet.ServletUpgradeRequest;
 import org.eclipse.jetty.websocket.servlet.ServletUpgradeResponse;
@@ -39,16 +48,21 @@ import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory;
 /**
  * Handle connection upgrade requests and creates web sockets
  */
-public class WSServlet extends WebSocketServlet {
+public class WSServlet extends WebSocketServlet implements BrokerServiceAware {
 
     private static final long serialVersionUID = -4716657876092884139L;
 
     private TransportAcceptListener listener;
 
-    private final static Map<String, Integer> stompProtocols = new ConcurrentHashMap<>
();
-    private final static Map<String, Integer> mqttProtocols = new ConcurrentHashMap<>
();
+    private final static Map<String, Integer> stompProtocols = new ConcurrentHashMap<>();
+    private final static Map<String, Integer> mqttProtocols = new ConcurrentHashMap<>();
 
     private Map<String, Object> transportOptions;
+    private BrokerService brokerService;
+
+    private enum Protocol {
+        MQTT, STOMP, UNKNOWN
+    }
 
     static {
         stompProtocols.put("v12.stomp", 3);
@@ -80,41 +94,98 @@ public class WSServlet extends WebSocketServlet {
             @Override
             public Object createWebSocket(ServletUpgradeRequest req, ServletUpgradeResponse
resp) {
                 WebSocketListener socket;
-                boolean isMqtt = false;
-                for (String subProtocol : req.getSubProtocols()) {
-                    if (subProtocol.startsWith("mqtt")) {
-                        isMqtt = true;
+                Protocol requestedProtocol = Protocol.UNKNOWN;
+
+                // When no sub-protocol is requested we default to STOMP for legacy reasons.
+                if (!req.getSubProtocols().isEmpty()) {
+                    for (String subProtocol : req.getSubProtocols()) {
+                        if (subProtocol.startsWith("mqtt")) {
+                            requestedProtocol = Protocol.MQTT;
+                        } else if (subProtocol.contains("stomp")) {
+                            requestedProtocol = Protocol.STOMP;
+                        }
                     }
-                }
-                if (isMqtt) {
-                    socket = new MQTTSocket(HttpTransportUtils.generateWsRemoteAddress(req.getHttpServletRequest()));
-                    resp.setAcceptedSubProtocol(getAcceptedSubProtocol(mqttProtocols,req.getSubProtocols(),
"mqtt"));
-                    ((MQTTSocket)socket).setTransportOptions(new HashMap(transportOptions));
-                    ((MQTTSocket)socket).setPeerCertificates(req.getCertificates());
                 } else {
-                    socket = new StompSocket(HttpTransportUtils.generateWsRemoteAddress(req.getHttpServletRequest()));
-                    ((StompSocket)socket).setCertificates(req.getCertificates());
-                    resp.setAcceptedSubProtocol(getAcceptedSubProtocol(stompProtocols,req.getSubProtocols(),
"stomp"));
+                    requestedProtocol = Protocol.STOMP;
+                }
+
+                switch (requestedProtocol) {
+                    case MQTT:
+                        socket = new MQTTSocket(HttpTransportUtils.generateWsRemoteAddress(req.getHttpServletRequest()));
+                        ((MQTTSocket) socket).setTransportOptions(new HashMap<String,
Object>(transportOptions));
+                        ((MQTTSocket) socket).setPeerCertificates(req.getCertificates());
+                        resp.setAcceptedSubProtocol(getAcceptedSubProtocol(mqttProtocols,
req.getSubProtocols(), "mqtt"));
+                        break;
+                    case UNKNOWN:
+                        socket = findWSTransport(req, resp);
+                        if (socket != null) {
+                            break;
+                        }
+                    case STOMP:
+                        socket = new StompSocket(HttpTransportUtils.generateWsRemoteAddress(req.getHttpServletRequest()));
+                        ((StompSocket) socket).setPeerCertificates(req.getCertificates());
+                        resp.setAcceptedSubProtocol(getAcceptedSubProtocol(stompProtocols,
req.getSubProtocols(), "stomp"));
+                        break;
+                    default:
+                        socket = null;
+                        listener.onAcceptError(new IOException("Unknown protocol requested"));
+                        break;
+                }
+
+                if (socket != null) {
+                    listener.onAccept((Transport) socket);
                 }
-                listener.onAccept((Transport) socket);
+
                 return socket;
             }
         });
     }
 
-    private String getAcceptedSubProtocol(final Map<String, Integer> protocols,
-            List<String> subProtocols, String defaultProtocol) {
+    private WebSocketListener findWSTransport(ServletUpgradeRequest request, ServletUpgradeResponse
response) {
+        WSTransportProxy proxy = null;
+
+        for (String subProtocol : request.getSubProtocols()) {
+            try {
+                String remoteAddress = HttpTransportUtils.generateWsRemoteAddress(request.getHttpServletRequest(),
subProtocol);
+                URI remoteURI = new URI(remoteAddress);
+
+                TransportFactory factory = TransportFactory.findTransportFactory(remoteURI);
+
+                if (factory instanceof BrokerServiceAware) {
+                    ((BrokerServiceAware) factory).setBrokerService(brokerService);
+                }
+
+                Transport transport = factory.doConnect(remoteURI);
+
+                proxy = new WSTransportProxy(remoteAddress, transport);
+                proxy.setPeerCertificates(request.getCertificates());
+                proxy.setTransportOptions(transportOptions);
+
+                response.setAcceptedSubProtocol(proxy.getSubProtocol());
+            } catch (Exception e) {
+                proxy = null;
+
+                // Keep going and try any other sub-protocols present.
+                continue;
+            }
+        }
+
+        return proxy;
+    }
+
+    private String getAcceptedSubProtocol(final Map<String, Integer> protocols, List<String>
subProtocols, String defaultProtocol) {
         List<SubProtocol> matchedProtocols = new ArrayList<>();
         if (subProtocols != null && subProtocols.size() > 0) {
-            //detect which subprotocols match accepted protocols and add to the list
+            // detect which subprotocols match accepted protocols and add to the
+            // list
             for (String subProtocol : subProtocols) {
                 Integer priority = protocols.get(subProtocol);
-                if(subProtocol != null && priority != null) {
-                    //only insert if both subProtocol and priority are not null
+                if (subProtocol != null && priority != null) {
+                    // only insert if both subProtocol and priority are not null
                     matchedProtocols.add(new SubProtocol(subProtocol, priority));
                 }
             }
-            //sort the list by priority
+            // sort the list by priority
             if (matchedProtocols.size() > 0) {
                 Collections.sort(matchedProtocols, new Comparator<SubProtocol>() {
                     @Override
@@ -131,6 +202,7 @@ public class WSServlet extends WebSocketServlet {
     private class SubProtocol {
         private String protocol;
         private Integer priority;
+
         public SubProtocol(String protocol, Integer priority) {
             this.protocol = protocol;
             this.priority = priority;
@@ -140,4 +212,9 @@ public class WSServlet extends WebSocketServlet {
     public void setTransportOptions(Map<String, Object> transportOptions) {
         this.transportOptions = transportOptions;
     }
+
+    @Override
+    public void setBrokerService(BrokerService brokerService) {
+        this.brokerService = brokerService;
+    }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/31c55f75/activemq-http/src/main/java/org/apache/activemq/transport/wss/WSSTransportFactory.java
----------------------------------------------------------------------
diff --git a/activemq-http/src/main/java/org/apache/activemq/transport/wss/WSSTransportFactory.java
b/activemq-http/src/main/java/org/apache/activemq/transport/wss/WSSTransportFactory.java
index 34e8502..05a8159 100644
--- a/activemq-http/src/main/java/org/apache/activemq/transport/wss/WSSTransportFactory.java
+++ b/activemq-http/src/main/java/org/apache/activemq/transport/wss/WSSTransportFactory.java
@@ -22,6 +22,8 @@ import java.net.URISyntaxException;
 import java.util.HashMap;
 import java.util.Map;
 
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.BrokerServiceAware;
 import org.apache.activemq.broker.SslContext;
 import org.apache.activemq.transport.TransportFactory;
 import org.apache.activemq.transport.TransportServer;
@@ -32,7 +34,9 @@ import org.apache.activemq.util.URISupport;
 /**
  * Factory for Secure WebSocket (wss) transport
  */
-public class WSSTransportFactory extends TransportFactory {
+public class WSSTransportFactory extends TransportFactory implements BrokerServiceAware {
+
+    private BrokerService brokerService;
 
     @Override
     public TransportServer doBind(URI location) throws IOException {
@@ -44,9 +48,15 @@ public class WSSTransportFactory extends TransportFactory {
             IntrospectionSupport.setProperties(result, transportOptions);
             result.setTransportOption(transportOptions);
             result.setHttpOptions(httpOptions);
+            result.setBrokerService(brokerService);
             return result;
         } catch (URISyntaxException e) {
             throw IOExceptionSupport.create(e);
         }
     }
+
+    @Override
+    public void setBrokerService(BrokerService brokerService) {
+        this.brokerService = brokerService;
+    }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/31c55f75/activemq-http/src/test/java/org/apache/activemq/transport/ws/MQTTWSConnection.java
----------------------------------------------------------------------
diff --git a/activemq-http/src/test/java/org/apache/activemq/transport/ws/MQTTWSConnection.java
b/activemq-http/src/test/java/org/apache/activemq/transport/ws/MQTTWSConnection.java
index fdbf867..c4e8c47 100644
--- a/activemq-http/src/test/java/org/apache/activemq/transport/ws/MQTTWSConnection.java
+++ b/activemq-http/src/test/java/org/apache/activemq/transport/ws/MQTTWSConnection.java
@@ -250,9 +250,6 @@ public class MQTTWSConnection extends WebSocketAdapter implements WebSocketListe
         }
     }
 
-    /* (non-Javadoc)
-     * @see org.eclipse.jetty.websocket.api.WebSocketListener#onWebSocketClose(int, java.lang.String)
-     */
     @Override
     public void onWebSocketClose(int statusCode, String reason) {
         LOG.trace("MQTT WS Connection closed, code:{} message:{}", statusCode, reason);
@@ -263,14 +260,9 @@ public class MQTTWSConnection extends WebSocketAdapter implements WebSocketListe
 
     }
 
-    /* (non-Javadoc)
-     * @see org.eclipse.jetty.websocket.api.WebSocketListener#onWebSocketConnect(org.eclipse.jetty.websocket.api.Session)
-     */
     @Override
-    public void onWebSocketConnect(
-            org.eclipse.jetty.websocket.api.Session session) {
+    public void onWebSocketConnect(org.eclipse.jetty.websocket.api.Session session) {
         this.connection = session;
         this.connectLatch.countDown();
     }
-
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/31c55f75/activemq-http/src/test/java/org/apache/activemq/transport/ws/StompWSConnectionTimeoutTest.java
----------------------------------------------------------------------
diff --git a/activemq-http/src/test/java/org/apache/activemq/transport/ws/StompWSConnectionTimeoutTest.java
b/activemq-http/src/test/java/org/apache/activemq/transport/ws/StompWSConnectionTimeoutTest.java
index 83cbd69..844c661 100644
--- a/activemq-http/src/test/java/org/apache/activemq/transport/ws/StompWSConnectionTimeoutTest.java
+++ b/activemq-http/src/test/java/org/apache/activemq/transport/ws/StompWSConnectionTimeoutTest.java
@@ -43,13 +43,9 @@ public class StompWSConnectionTimeoutTest extends WSTransportTestSupport
{
         super.setUp();
         wsStompConnection = new StompWSConnection();
 
-//        WebSocketClientFactory clientFactory = new WebSocketClientFactory();
-//        clientFactory.start();
         wsClient = new WebSocketClient();
         wsClient.start();
 
-
-
         wsClient.connect(wsStompConnection, wsConnectUri);
         if (!wsStompConnection.awaitConnection(30, TimeUnit.SECONDS)) {
             throw new IOException("Could not connect to STOMP WS endpoint");

http://git-wip-us.apache.org/repos/asf/activemq/blob/31c55f75/activemq-http/src/test/java/org/apache/activemq/transport/ws/WSTransportTestSupport.java
----------------------------------------------------------------------
diff --git a/activemq-http/src/test/java/org/apache/activemq/transport/ws/WSTransportTestSupport.java
b/activemq-http/src/test/java/org/apache/activemq/transport/ws/WSTransportTestSupport.java
index edf7b6c..c83f24d 100644
--- a/activemq-http/src/test/java/org/apache/activemq/transport/ws/WSTransportTestSupport.java
+++ b/activemq-http/src/test/java/org/apache/activemq/transport/ws/WSTransportTestSupport.java
@@ -69,18 +69,16 @@ public class WSTransportTestSupport {
         LOG.info("========== Finished test: {} ==========", name.getMethodName());
     }
 
-//    protected String getWSConnectorURI() {
-//        return "ws://127.0.0.1:" + getProxyPort() +
-//            "?allowLinkStealing=" + isAllowLinkStealing() +
-//            "&websocket.maxTextMessageSize=99999&" +
-//            "transport.maxIdleTime=1001";
-//    }
+    protected String getWSConnectionURI() {
+        return "ws://127.0.0.1:" + getProxyPort();
+    }
 
     protected String getWSConnectorURI() {
         return "ws://127.0.0.1:" + getProxyPort() +
-            "?allowLinkStealing=" + isAllowLinkStealing() +
-            "&websocket.maxTextMessageSize=99999&" +
-            "transport.idleTimeout=1001";
+               "?allowLinkStealing=" + isAllowLinkStealing() +
+               "&websocket.maxTextMessageSize=99999" +
+               "&transport.idleTimeout=1001" +
+               "&trace=true&transport.trace=true";
     }
 
     protected boolean isAllowLinkStealing() {

http://git-wip-us.apache.org/repos/asf/activemq/blob/31c55f75/activemq-http/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/activemq-http/src/test/resources/log4j.properties b/activemq-http/src/test/resources/log4j.properties
index aa64270..2aabf97 100755
--- a/activemq-http/src/test/resources/log4j.properties
+++ b/activemq-http/src/test/resources/log4j.properties
@@ -20,8 +20,10 @@
 #
 log4j.rootLogger=INFO, out, stdout
 
-log4j.logger.org.apache.activemq.transport.ws=DEBUG
+log4j.logger.org.apache.activemq.transport.ws=TRACE
 log4j.logger.org.apache.activemq.transport.http=DEBUG
+log4j.logger.org.apache.activemq.transport.amqp=TRACE
+log4j.logger.org.apache.activemq.transport.amqp.FRAMES=TRACE
 
 #log4j.logger.org.apache.activemq.broker.scheduler=DEBUG
 #log4j.logger.org.apache.activemq.network.DemandForwardingBridgeSupport=DEBUG

http://git-wip-us.apache.org/repos/asf/activemq/blob/31c55f75/activemq-unit-tests/pom.xml
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/pom.xml b/activemq-unit-tests/pom.xml
index ef3f003..a5f114e 100755
--- a/activemq-unit-tests/pom.xml
+++ b/activemq-unit-tests/pom.xml
@@ -64,10 +64,6 @@
     </dependency>
     <dependency>
       <groupId>org.apache.activemq</groupId>
-      <artifactId>activemq-amqp</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.activemq</groupId>
       <artifactId>activemq-partition</artifactId>
     </dependency>
     <dependency>

http://git-wip-us.apache.org/repos/asf/activemq/blob/31c55f75/activemq-unit-tests/src/test/java/org/apache/activemq/conversions/AmqpAndMqttTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/conversions/AmqpAndMqttTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/conversions/AmqpAndMqttTest.java
deleted file mode 100644
index c4a2fd7..0000000
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/conversions/AmqpAndMqttTest.java
+++ /dev/null
@@ -1,136 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.conversions;
-
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-
-import java.io.UnsupportedEncodingException;
-import java.util.Arrays;
-
-import javax.jms.BytesMessage;
-import javax.jms.Connection;
-import javax.jms.ExceptionListener;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.Session;
-
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.TransportConnector;
-import org.apache.qpid.jms.JmsConnectionFactory;
-import org.fusesource.mqtt.client.BlockingConnection;
-import org.fusesource.mqtt.client.MQTT;
-import org.fusesource.mqtt.client.QoS;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-public class AmqpAndMqttTest {
-
-    protected BrokerService broker;
-    private TransportConnector amqpConnector;
-    private TransportConnector mqttConnector;
-
-    @Before
-    public void setUp() throws Exception {
-        broker = createBroker();
-        broker.start();
-        broker.waitUntilStarted();
-    }
-
-    @After
-    public void tearDown() throws Exception {
-        if (broker != null) {
-            broker.stop();
-            broker.waitUntilStopped();
-            broker = null;
-        }
-    }
-
-    protected BrokerService createBroker() throws Exception {
-        BrokerService broker = new BrokerService();
-        broker.setPersistent(false);
-        broker.setUseJmx(false);
-        broker.setAdvisorySupport(false);
-        broker.setSchedulerSupport(false);
-
-        amqpConnector = broker.addConnector("amqp://0.0.0.0:0");
-        mqttConnector = broker.addConnector("mqtt://0.0.0.0:0");
-
-        return broker;
-    }
-
-    @Test(timeout = 60000)
-    public void testFromMqttToAmqp() throws Exception {
-        Connection amqp = createAmqpConnection();
-        Session session = amqp.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        MessageConsumer consumer = session.createConsumer(session.createTopic("topic://FOO"));
-
-        final BlockingConnection mqtt = createMQTTConnection().blockingConnection();
-        mqtt.connect();
-        byte[] payload = bytes("Hello World");
-        mqtt.publish("FOO", payload, QoS.AT_LEAST_ONCE, false);
-        mqtt.disconnect();
-
-        Message msg = consumer.receive(1000 * 5);
-        assertNotNull(msg);
-        assertTrue(msg instanceof BytesMessage);
-
-        BytesMessage bmsg = (BytesMessage) msg;
-        byte[] actual = new byte[(int) bmsg.getBodyLength()];
-        bmsg.readBytes(actual);
-        assertTrue(Arrays.equals(actual, payload));
-        amqp.close();
-    }
-
-    private byte[] bytes(String value) {
-        try {
-            return value.getBytes("UTF-8");
-        } catch (UnsupportedEncodingException e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    protected MQTT createMQTTConnection() throws Exception {
-        MQTT mqtt = new MQTT();
-        mqtt.setConnectAttemptsMax(1);
-        mqtt.setReconnectAttemptsMax(0);
-        mqtt.setHost("localhost", mqttConnector.getConnectUri().getPort());
-        return mqtt;
-    }
-
-    public Connection createAmqpConnection() throws Exception {
-
-        String amqpURI = "amqp://localhost:" + amqpConnector.getConnectUri().getPort();
-
-        final JmsConnectionFactory factory = new JmsConnectionFactory(amqpURI);
-
-        factory.setUsername("admin");
-        factory.setPassword("password");
-
-        final Connection connection = factory.createConnection();
-        connection.setExceptionListener(new ExceptionListener() {
-            @Override
-            public void onException(JMSException exception) {
-                exception.printStackTrace();
-            }
-        });
-        connection.start();
-        return connection;
-    }
-}

http://git-wip-us.apache.org/repos/asf/activemq/blob/31c55f75/activemq-unit-tests/src/test/java/org/apache/activemq/transport/StubTransport.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/StubTransport.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/StubTransport.java
index 8fb70ec..24a1dcd 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/StubTransport.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/StubTransport.java
@@ -17,26 +17,27 @@
 package org.apache.activemq.transport;
 
 import java.io.IOException;
+import java.security.cert.X509Certificate;
 import java.util.Queue;
 import java.util.concurrent.ConcurrentLinkedQueue;
 
 import org.apache.activemq.util.ServiceStopper;
+import org.apache.activemq.wireformat.WireFormat;
 
-/**
- * 
- * 
- */
 public class StubTransport extends TransportSupport {
 
     private Queue<Object> queue = new ConcurrentLinkedQueue<Object>();
     private volatile int receiveCounter;
 
+    @Override
     protected void doStop(ServiceStopper stopper) throws Exception {
     }
 
+    @Override
     protected void doStart() throws Exception {
     }
 
+    @Override
     public void oneway(Object command) throws IOException {
         receiveCounter++;
         queue.add(command);
@@ -46,12 +47,28 @@ public class StubTransport extends TransportSupport {
         return queue;
     }
 
+    @Override
     public String getRemoteAddress() {
         return null;
     }
 
+    @Override
     public int getReceiveCounter() {
         return receiveCounter;
     }
 
+    @Override
+    public X509Certificate[] getPeerCertificates() {
+        return null;
+    }
+
+    @Override
+    public void setPeerCertificates(X509Certificate[] certificates) {
+
+    }
+
+    @Override
+    public WireFormat getWireFormat() {
+        return null;
+    }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/31c55f75/activemq-unit-tests/src/test/java/org/apache/activemq/transport/auto/AutoTransportConfigureTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/auto/AutoTransportConfigureTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/auto/AutoTransportConfigureTest.java
index 9054e1a..82d87ba 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/auto/AutoTransportConfigureTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/auto/AutoTransportConfigureTest.java
@@ -35,7 +35,6 @@ import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 import org.junit.runners.Parameterized.Parameters;
 
-
 @RunWith(Parameterized.class)
 public class AutoTransportConfigureTest {
 
@@ -49,12 +48,7 @@ public class AutoTransportConfigureTest {
 
     @Parameters
     public static Iterable<Object[]> parameters() {
-        return Arrays.asList(new Object[][]{
-                {"auto"},
-                {"auto+nio"},
-                {"auto+ssl"},
-                {"auto+nio+ssl"}
-        });
+        return Arrays.asList(new Object[][] { { "auto" }, { "auto+nio" }, { "auto+ssl" },
{ "auto+nio+ssl" } });
     }
 
     private String transportType;
@@ -76,7 +70,7 @@ public class AutoTransportConfigureTest {
     }
 
     @After
-    public void tearDown() throws Exception{
+    public void tearDown() throws Exception {
         if (this.brokerService != null) {
             this.brokerService.stop();
             this.brokerService.waitUntilStopped();
@@ -92,7 +86,7 @@ public class AutoTransportConfigureTest {
 
     }
 
-    @Test(expected=JMSException.class)
+    @Test(expected = JMSException.class)
     public void testUrlConfiguration() throws Exception {
         createBroker(transportType + "://localhost:0?wireFormat.maxFrameSize=10");
 
@@ -100,7 +94,7 @@ public class AutoTransportConfigureTest {
         sendMessage(factory.createConnection());
     }
 
-    @Test(expected=JMSException.class)
+    @Test(expected = JMSException.class)
     public void testUrlConfigurationOpenWireFail() throws Exception {
         createBroker(transportType + "://localhost:0?wireFormat.default.maxFrameSize=10");
 
@@ -110,17 +104,17 @@ public class AutoTransportConfigureTest {
 
     @Test
     public void testUrlConfigurationOpenWireSuccess() throws Exception {
-        //Will work because max frame size only applies to amqp
-        createBroker(transportType + "://localhost:0?wireFormat.amqp.maxFrameSize=10");
+        // Will work because max frame size only applies to stomp
+        createBroker(transportType + "://localhost:0?wireFormat.stomp.maxFrameSize=10");
 
         ConnectionFactory factory = new ActiveMQConnectionFactory(url);
         sendMessage(factory.createConnection());
     }
 
-    @Test(expected=JMSException.class)
+    @Test(expected = JMSException.class)
     public void testUrlConfigurationOpenWireNotAvailable() throws Exception {
-        //only amqp is available so should fail
-        createBroker(transportType + "://localhost:0?auto.protocols=amqp");
+        // only stomp is available so should fail
+        createBroker(transportType + "://localhost:0?auto.protocols=stomp");
 
         ConnectionFactory factory = new ActiveMQConnectionFactory(url);
         sendMessage(factory.createConnection());
@@ -128,7 +122,7 @@ public class AutoTransportConfigureTest {
 
     @Test
     public void testUrlConfigurationOpenWireAvailable() throws Exception {
-        //only open wire is available
+        // only open wire is available
         createBroker(transportType + "://localhost:0?auto.protocols=default");
 
         ConnectionFactory factory = new ActiveMQConnectionFactory(url);
@@ -137,13 +131,12 @@ public class AutoTransportConfigureTest {
 
     @Test
     public void testUrlConfigurationOpenWireAndAmqpAvailable() throws Exception {
-        createBroker(transportType + "://localhost:0?auto.protocols=default,amqp");
+        createBroker(transportType + "://localhost:0?auto.protocols=default,stomp");
 
         ConnectionFactory factory = new ActiveMQConnectionFactory(url);
         sendMessage(factory.createConnection());
     }
 
-
     protected void sendMessage(Connection connection) throws JMSException {
         connection.start();
         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
@@ -152,5 +145,4 @@ public class AutoTransportConfigureTest {
         message.setText("this is a test");
         producer.send(message);
     }
-
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/31c55f75/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index cf24896..f3c47ae 100755
--- a/pom.xml
+++ b/pom.xml
@@ -103,7 +103,7 @@
     <zookeeper-version>3.4.6</zookeeper-version>
     <qpid-proton-version>0.13.0</qpid-proton-version>
     <qpid-jms-version>0.9.0</qpid-jms-version>
-    <netty-all-version>4.0.33.Final</netty-all-version>
+    <netty-all-version>4.0.37.Final</netty-all-version>
     <regexp-version>1.3</regexp-version>
     <rome-version>1.0</rome-version>
     <saxon-version>9.5.1-5</saxon-version>


Mime
View raw message