activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dk...@apache.org
Subject activemq git commit: [AMQ-5517] Runtime support for Jetty 9. Build/compile with Jetty8, but tests pass with Jetty 9 for runtime level support.
Date Tue, 13 Jan 2015 17:49:26 GMT
Repository: activemq
Updated Branches:
  refs/heads/trunk bc2e2d9a4 -> 3f8262507


[AMQ-5517] Runtime support for Jetty 9.  Build/compile with Jetty8, but tests pass with Jetty 9 for runtime level support.


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/3f826250
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/3f826250
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/3f826250

Branch: refs/heads/trunk
Commit: 3f826250775c6c022ebad1de57db11770c9cfe1a
Parents: bc2e2d9
Author: Daniel Kulp <dkulp@apache.org>
Authored: Tue Jan 13 12:47:53 2015 -0500
Committer: Daniel Kulp <dkulp@apache.org>
Committed: Tue Jan 13 12:47:53 2015 -0500

----------------------------------------------------------------------
 activemq-http/pom.xml                           |   7 +
 .../transport/SecureSocketConnectorFactory.java | 131 ++++++++-------
 .../transport/SocketConnectorFactory.java       |  17 +-
 .../transport/WebTransportServerSupport.java    |  20 ++-
 .../discovery/http/EmbeddedJettyServer.java     |  22 +--
 .../transport/http/HttpTransportServer.java     |  25 ++-
 .../https/Krb5AndCertsSslSocketConnector.java   |   8 +
 .../activemq/transport/ws/MQTTSocket.java       | 150 -----------------
 .../activemq/transport/ws/StompSocket.java      | 134 ---------------
 .../apache/activemq/transport/ws/WSServlet.java |  62 -------
 .../transport/ws/WSTransportServer.java         |  18 ++-
 .../transport/ws/jetty8/MQTTSocket.java         | 150 +++++++++++++++++
 .../transport/ws/jetty8/StompSocket.java        | 134 +++++++++++++++
 .../activemq/transport/ws/jetty8/WSServlet.java |  62 +++++++
 .../transport/ws/jetty9/MQTTSocket.java         | 161 +++++++++++++++++++
 .../transport/ws/jetty9/StompSocket.java        | 142 ++++++++++++++++
 .../activemq/transport/ws/jetty9/WSServlet.java |  72 +++++++++
 .../activemq/transport/ws/WSTransportTest.java  |  16 +-
 .../transport/wss/WSSTransportTest.java         |  22 +--
 activemq-osgi/pom.xml                           |   2 +-
 activemq-web-console/pom.xml                    |   3 +-
 21 files changed, 905 insertions(+), 453 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/3f826250/activemq-http/pom.xml
----------------------------------------------------------------------
diff --git a/activemq-http/pom.xml b/activemq-http/pom.xml
index d1b6c02..96a22e1 100755
--- a/activemq-http/pom.xml
+++ b/activemq-http/pom.xml
@@ -108,6 +108,13 @@
       <version>2.25.0</version>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.eclipse.jetty.websocket</groupId>
+      <artifactId>websocket-server</artifactId>
+      <version>${jetty9-version}</version>
+      <scope>provided</scope>
+      <optional>true</optional>
+  </dependency>
   </dependencies>
   <build>
     <plugins>

http://git-wip-us.apache.org/repos/asf/activemq/blob/3f826250/activemq-http/src/main/java/org/apache/activemq/transport/SecureSocketConnectorFactory.java
----------------------------------------------------------------------
diff --git a/activemq-http/src/main/java/org/apache/activemq/transport/SecureSocketConnectorFactory.java b/activemq-http/src/main/java/org/apache/activemq/transport/SecureSocketConnectorFactory.java
index 6c98cac..3ac922a 100644
--- a/activemq-http/src/main/java/org/apache/activemq/transport/SecureSocketConnectorFactory.java
+++ b/activemq-http/src/main/java/org/apache/activemq/transport/SecureSocketConnectorFactory.java
@@ -23,8 +23,6 @@ import org.apache.activemq.transport.https.Krb5AndCertsSslSocketConnector;
 import org.apache.activemq.util.IntrospectionSupport;
 import org.eclipse.jetty.server.Connector;
 import org.eclipse.jetty.server.Server;
-import org.eclipse.jetty.server.ssl.SslConnector;
-import org.eclipse.jetty.server.ssl.SslSelectChannelConnector;
 import org.eclipse.jetty.util.ssl.SslContextFactory;
 
 public class SecureSocketConnectorFactory extends SocketConnectorFactory {
@@ -44,76 +42,101 @@ public class SecureSocketConnectorFactory extends SocketConnectorFactory {
     private String auth;
 
     private SslContext context;
-
+    private SslContextFactory contextFactory;
+    
+    public SecureSocketConnectorFactory() {
+        
+    }
     public SecureSocketConnectorFactory(SslContext context) {
         this.context = context;
     }
 
+    public SecureSocketConnectorFactory(SslContextFactory contextFactory) {
+        this.contextFactory = contextFactory;
+    }
+
     @Override
     public Connector createConnector(Server server) throws Exception {
-        IntrospectionSupport.setProperties(this, getTransportOptions());
-        SslConnector sslConnector;
-        if (Krb5AndCertsSslSocketConnector.isKrb(auth)) {
-            sslConnector = new Krb5AndCertsSslSocketConnector();
-            ((Krb5AndCertsSslSocketConnector)sslConnector).setMode(auth);
-        } else {
-            sslConnector = new SslSelectChannelConnector();
+        if (getTransportOptions() != null) {
+            IntrospectionSupport.setProperties(this, getTransportOptions());
         }
 
         SSLContext sslContext = context == null ? null : context.getSSLContext();
 
         // Get a reference to the current ssl context factory...
-        SslContextFactory factory = sslConnector.getSslContextFactory();
-
-        if (context != null) {
-
-            // Should not be using this method since it does not use all of the values
-            // from the passed SslContext instance.....
-            factory.setSslContext(sslContext);
 
+        SslContextFactory factory;
+        if (contextFactory == null) {
+            factory = new SslContextFactory();
+            if (context != null) {
+                // Should not be using this method since it does not use all of the values
+                // from the passed SslContext instance.....
+                factory.setSslContext(sslContext);
+
+            } else {
+                if (keyStore != null) {
+                    factory.setKeyStorePath(keyStore);
+                }
+                if (keyStorePassword != null) {
+                    factory.setKeyStorePassword(keyStorePassword);
+                }
+                // if the keyPassword hasn't been set, default it to the
+                // key store password
+                if (keyPassword == null && keyStorePassword != null) {
+                    factory.setKeyStorePassword(keyStorePassword);
+                }
+                if (keyStoreType != null) {
+                    factory.setKeyStoreType(keyStoreType);
+                }
+                if (secureRandomCertficateAlgorithm != null) {
+                    factory.setSecureRandomAlgorithm(secureRandomCertficateAlgorithm);
+                }
+                if (keyCertificateAlgorithm != null) {
+                    factory.setSslKeyManagerFactoryAlgorithm(keyCertificateAlgorithm);
+                }
+                if (trustCertificateAlgorithm != null) {
+                    factory.setTrustManagerFactoryAlgorithm(trustCertificateAlgorithm);
+                }
+                if (protocol != null) {
+                    factory.setProtocol(protocol);
+                }
+                if (trustStore != null) {
+                    setTrustStore(factory, trustStore);
+                }
+                if (trustStorePassword != null) {
+                    factory.setTrustStorePassword(trustStorePassword);
+                }
+            }
+            factory.setNeedClientAuth(needClientAuth);
+            factory.setWantClientAuth(wantClientAuth);
         } else {
+            factory = contextFactory;
+        }
 
-            if (keyStore != null) {
-                factory.setKeyStorePath(keyStore);
-            }
-            if (keyStorePassword != null) {
-                factory.setKeyStorePassword(keyStorePassword);
-            }
-            // if the keyPassword hasn't been set, default it to the
-            // key store password
-            if (keyPassword == null && keyStorePassword != null) {
-                factory.setKeyStorePassword(keyStorePassword);
-            }
-            if (keyStoreType != null) {
-                factory.setKeyStoreType(keyStoreType);
-            }
-            if (secureRandomCertficateAlgorithm != null) {
-                factory.setSecureRandomAlgorithm(secureRandomCertficateAlgorithm);
-            }
-            if (keyCertificateAlgorithm != null) {
-                factory.setSslKeyManagerFactoryAlgorithm(keyCertificateAlgorithm);
-            }
-            if (trustCertificateAlgorithm != null) {
-                factory.setTrustManagerFactoryAlgorithm(trustCertificateAlgorithm);
-            }
-            if (protocol != null) {
-                factory.setProtocol(protocol);
-            }
-            if (trustStore != null) {
-                factory.setTrustStore(trustStore);
-            }
-            if (trustStorePassword != null) {
-                factory.setTrustStorePassword(trustStorePassword);
+        
+        if ("KRB".equals(auth) || "BOTH".equals(auth)
+            && Server.getVersion().startsWith("8")) {
+            return new Krb5AndCertsSslSocketConnector(factory, auth);
+        } else {
+            try {
+                Class<?> cls = Class.forName("org.eclipse.jetty.server.ssl.SslSelectChannelConnector", true, Server.class.getClassLoader());
+                return (Connector)cls.getConstructor(SslContextFactory.class).newInstance(factory);
+            } catch (Throwable t) {
+                Class<?> c = Class.forName("org.eclipse.jetty.server.ServerConnector", true, Server.class.getClassLoader());
+                Connector connector = (Connector)c.getConstructor(Server.class, SslContextFactory.class).newInstance(server, factory);
+                Server.class.getMethod("setStopTimeout", Long.TYPE).invoke(server, 500);
+                connector.getClass().getMethod("setStopTimeout", Long.TYPE).invoke(connector, 500);
+                return connector;
             }
-
         }
-
-        factory.setNeedClientAuth(needClientAuth);
-        factory.setWantClientAuth(wantClientAuth);
-
-        return sslConnector;
     }
+    private void setTrustStore(SslContextFactory factory, String trustStore2) throws Exception {
+        String mname = Server.getVersion().startsWith("8") ? "setTrustStore" : "setTrustStorePath";
+        factory.getClass().getMethod(mname, String.class).invoke(factory, trustStore2);
+    }
+
 
+    
     // Properties
     // --------------------------------------------------------------------------------
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/3f826250/activemq-http/src/main/java/org/apache/activemq/transport/SocketConnectorFactory.java
----------------------------------------------------------------------
diff --git a/activemq-http/src/main/java/org/apache/activemq/transport/SocketConnectorFactory.java b/activemq-http/src/main/java/org/apache/activemq/transport/SocketConnectorFactory.java
index 36b800b..b982f18 100644
--- a/activemq-http/src/main/java/org/apache/activemq/transport/SocketConnectorFactory.java
+++ b/activemq-http/src/main/java/org/apache/activemq/transport/SocketConnectorFactory.java
@@ -21,15 +21,26 @@ import java.util.Map;
 import org.apache.activemq.util.IntrospectionSupport;
 import org.eclipse.jetty.server.Connector;
 import org.eclipse.jetty.server.Server;
-import org.eclipse.jetty.server.nio.SelectChannelConnector;
 
 public class SocketConnectorFactory {
 
     private Map<String, Object> transportOptions;
 
     public Connector createConnector(Server server) throws Exception {
-        SelectChannelConnector connector = new SelectChannelConnector();
-        IntrospectionSupport.setProperties(connector, transportOptions, "");
+        Connector connector = null;
+        
+        try {
+            connector = (Connector)Class.forName("org.eclipse.jetty.server.nio.SelectChannelConnector", true, Server.class.getClassLoader()).newInstance();
+        } catch (Throwable t) {
+            Class<?> c = Class.forName("org.eclipse.jetty.server.ServerConnector", true, Server.class.getClassLoader());
+            connector = (Connector)c.getConstructor(Server.class).newInstance(server);
+            Server.class.getMethod("setStopTimeout", Long.TYPE).invoke(server, 500);
+            connector.getClass().getMethod("setStopTimeout", Long.TYPE).invoke(connector, 500);
+        }
+        System.out.println(transportOptions);
+        if (transportOptions != null) {
+            IntrospectionSupport.setProperties(connector, transportOptions, "");
+        }
         return connector;
     }
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/3f826250/activemq-http/src/main/java/org/apache/activemq/transport/WebTransportServerSupport.java
----------------------------------------------------------------------
diff --git a/activemq-http/src/main/java/org/apache/activemq/transport/WebTransportServerSupport.java b/activemq-http/src/main/java/org/apache/activemq/transport/WebTransportServerSupport.java
index 28c11a6..a52424e 100644
--- a/activemq-http/src/main/java/org/apache/activemq/transport/WebTransportServerSupport.java
+++ b/activemq-http/src/main/java/org/apache/activemq/transport/WebTransportServerSupport.java
@@ -35,6 +35,18 @@ abstract public class WebTransportServerSupport extends TransportServerSupport {
         super(location);
     }
 
+    private <T> void setConnectorProperty(String name, Class<T> type, T value) throws Exception {
+        connector.getClass().getMethod("set" + name, type).invoke(connector, value);
+    }
+    
+    protected void createServer() {
+        server = new Server();
+        try {
+            server.getClass().getMethod("setStopTimeout", Long.TYPE).invoke(server, 500l);
+        } catch (Throwable t) {
+            //ignore, jetty 8.  
+        }
+    }
     public URI bind() throws Exception {
 
         URI bind = getBindLocation();
@@ -44,9 +56,11 @@ abstract public class WebTransportServerSupport extends TransportServerSupport {
         InetAddress addr = InetAddress.getByName(bindHost);
         host = addr.getCanonicalHostName();
 
-        connector.setHost(host);
-        connector.setPort(bindAddress.getPort());
-        connector.setServer(server);
+        setConnectorProperty("Host", String.class, host);
+        setConnectorProperty("Port", Integer.TYPE, bindAddress.getPort());
+        if (Server.getVersion().startsWith("8")) {
+            connector.setServer(server);
+        }
         server.addConnector(connector);
         if (addr.isAnyLocalAddress()) {
             host = InetAddressUtil.getLocalHostName();

http://git-wip-us.apache.org/repos/asf/activemq/blob/3f826250/activemq-http/src/main/java/org/apache/activemq/transport/discovery/http/EmbeddedJettyServer.java
----------------------------------------------------------------------
diff --git a/activemq-http/src/main/java/org/apache/activemq/transport/discovery/http/EmbeddedJettyServer.java b/activemq-http/src/main/java/org/apache/activemq/transport/discovery/http/EmbeddedJettyServer.java
index 96389ec..786e3aa 100644
--- a/activemq-http/src/main/java/org/apache/activemq/transport/discovery/http/EmbeddedJettyServer.java
+++ b/activemq-http/src/main/java/org/apache/activemq/transport/discovery/http/EmbeddedJettyServer.java
@@ -19,7 +19,6 @@ package org.apache.activemq.transport.discovery.http;
 import java.net.URI;
 
 import org.eclipse.jetty.server.Server;
-import org.eclipse.jetty.server.nio.SelectChannelConnector;
 import org.eclipse.jetty.servlet.ServletContextHandler;
 import org.eclipse.jetty.servlet.ServletHolder;
 
@@ -27,13 +26,16 @@ public class EmbeddedJettyServer implements org.apache.activemq.Service {
 
     private HTTPDiscoveryAgent agent;
     private Server server;
-    private SelectChannelConnector connector;
     private DiscoveryRegistryServlet camelServlet = new DiscoveryRegistryServlet();
     
     public void start() throws Exception {
         URI uri = new URI(agent.getRegistryURL());
 
-        server = new Server();
+        int port = 80;
+        if( uri.getPort() >=0 ) {
+            port = uri.getPort();
+        }
+        server = new Server(port);
         ServletContextHandler context = new ServletContextHandler(ServletContextHandler.NO_SECURITY | ServletContextHandler.NO_SESSIONS);
         
         context.setContextPath("/");
@@ -42,23 +44,9 @@ public class EmbeddedJettyServer implements org.apache.activemq.Service {
         context.addServlet(holder, "/*");
         server.setHandler(context);
         server.start();
-        
-        int port = 80;
-        if( uri.getPort() >=0 ) {
-            port = uri.getPort();
-        }
-        
-        connector = new SelectChannelConnector();
-        connector.setPort(port);
-        server.addConnector(connector);
-        connector.start();
     }
 
     public void stop() throws Exception {
-        if( connector!=null ) {
-            connector.stop();
-            connector = null;
-        }
         if( server!=null ) {
             server.stop();
             server = null;

http://git-wip-us.apache.org/repos/asf/activemq/blob/3f826250/activemq-http/src/main/java/org/apache/activemq/transport/http/HttpTransportServer.java
----------------------------------------------------------------------
diff --git a/activemq-http/src/main/java/org/apache/activemq/transport/http/HttpTransportServer.java b/activemq-http/src/main/java/org/apache/activemq/transport/http/HttpTransportServer.java
index 0c7ecd9..8ae7874 100755
--- a/activemq-http/src/main/java/org/apache/activemq/transport/http/HttpTransportServer.java
+++ b/activemq-http/src/main/java/org/apache/activemq/transport/http/HttpTransportServer.java
@@ -27,8 +27,8 @@ import org.apache.activemq.transport.util.TextWireFormat;
 import org.apache.activemq.transport.xstream.XStreamWireFormat;
 import org.apache.activemq.util.ServiceStopper;
 import org.eclipse.jetty.server.Connector;
+import org.eclipse.jetty.server.Handler;
 import org.eclipse.jetty.server.Server;
-import org.eclipse.jetty.server.handler.GzipHandler;
 import org.eclipse.jetty.servlet.ServletContextHandler;
 import org.eclipse.jetty.servlet.ServletHolder;
 import org.slf4j.Logger;
@@ -77,7 +77,7 @@ public class HttpTransportServer extends WebTransportServerSupport {
 
     @Override
     protected void doStart() throws Exception {
-        server = new Server();
+        createServer();
         if (connector == null) {
             connector = socketConnectorFactory.createConnector(server);
         }
@@ -96,8 +96,7 @@ public class HttpTransportServer extends WebTransportServerSupport {
         contextHandler.setAttribute("transportFactory", transportFactory);
         contextHandler.setAttribute("transportOptions", transportOptions);
 
-        GzipHandler gzipHandler = new GzipHandler();
-        contextHandler.setHandler(gzipHandler);
+        addGzipHandler(contextHandler);
 
         server.start();
 
@@ -105,8 +104,9 @@ public class HttpTransportServer extends WebTransportServerSupport {
         // was set to zero so that we report the actual port we are listening on.
 
         int port = boundTo.getPort();
-        if (connector.getLocalPort() != -1) {
-            port = connector.getLocalPort();
+        int p2 = getConnectorLocalPort();
+        if (p2 != -1) {
+            port = p2;
         }
 
         setConnectURI(new URI(boundTo.getScheme(),
@@ -118,6 +118,19 @@ public class HttpTransportServer extends WebTransportServerSupport {
                               boundTo.getFragment()));
     }
 
+    private int getConnectorLocalPort() throws Exception {
+        return (Integer)connector.getClass().getMethod("getLocalPort").invoke(connector);
+    }
+    private void addGzipHandler(ServletContextHandler contextHandler) throws Exception {
+        Handler handler = null;
+        try {
+            handler = (Handler)Class.forName("org.eclipse.jetty.server.handler.GzipHandler", true, Handler.class.getClassLoader()).newInstance();
+        } catch (Throwable t) {
+            handler = (Handler)Class.forName("org.eclipse.jetty.servlets.gzip.GzipHandler", true, Handler.class.getClassLoader()).newInstance();
+        }
+        contextHandler.setHandler(handler);
+    }
+
     @Override
     protected void doStop(ServiceStopper stopper) throws Exception {
         Server temp = server;

http://git-wip-us.apache.org/repos/asf/activemq/blob/3f826250/activemq-http/src/main/java/org/apache/activemq/transport/https/Krb5AndCertsSslSocketConnector.java
----------------------------------------------------------------------
diff --git a/activemq-http/src/main/java/org/apache/activemq/transport/https/Krb5AndCertsSslSocketConnector.java b/activemq-http/src/main/java/org/apache/activemq/transport/https/Krb5AndCertsSslSocketConnector.java
index 858c9ad..cf36122 100644
--- a/activemq-http/src/main/java/org/apache/activemq/transport/https/Krb5AndCertsSslSocketConnector.java
+++ b/activemq-http/src/main/java/org/apache/activemq/transport/https/Krb5AndCertsSslSocketConnector.java
@@ -68,6 +68,14 @@ public class Krb5AndCertsSslSocketConnector extends SslSocketConnector {
         useCerts = true;
         setPasswords();
     }
+    public Krb5AndCertsSslSocketConnector(SslContextFactory f, String auth) {
+        // By default, stick to cert based authentication
+        super(f);
+        useKrb = false;
+        useCerts = true;
+        setPasswords();
+        setMode(auth);
+    }
 
     public static boolean isKrb(String mode) {
         return mode == MODE.KRB.toString() || mode == MODE.BOTH.toString();

http://git-wip-us.apache.org/repos/asf/activemq/blob/3f826250/activemq-http/src/main/java/org/apache/activemq/transport/ws/MQTTSocket.java
----------------------------------------------------------------------
diff --git a/activemq-http/src/main/java/org/apache/activemq/transport/ws/MQTTSocket.java b/activemq-http/src/main/java/org/apache/activemq/transport/ws/MQTTSocket.java
deleted file mode 100644
index 047c459..0000000
--- a/activemq-http/src/main/java/org/apache/activemq/transport/ws/MQTTSocket.java
+++ /dev/null
@@ -1,150 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.transport.ws;
-
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.BrokerServiceAware;
-import org.apache.activemq.command.Command;
-import org.apache.activemq.transport.TransportSupport;
-import org.apache.activemq.transport.mqtt.MQTTInactivityMonitor;
-import org.apache.activemq.transport.mqtt.MQTTProtocolConverter;
-import org.apache.activemq.transport.mqtt.MQTTTransport;
-import org.apache.activemq.transport.mqtt.MQTTWireFormat;
-import org.apache.activemq.util.ByteSequence;
-import org.apache.activemq.util.IOExceptionSupport;
-import org.apache.activemq.util.ServiceStopper;
-import org.eclipse.jetty.websocket.WebSocket;
-import org.fusesource.mqtt.codec.DISCONNECT;
-import org.fusesource.mqtt.codec.MQTTFrame;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.security.cert.X509Certificate;
-import java.util.concurrent.CountDownLatch;
-
-public class MQTTSocket  extends TransportSupport implements WebSocket.OnBinaryMessage, MQTTTransport, BrokerServiceAware {
-
-    private static final Logger LOG = LoggerFactory.getLogger(MQTTSocket.class);
-    Connection outbound;
-    MQTTProtocolConverter protocolConverter = null;
-    MQTTWireFormat wireFormat = new MQTTWireFormat();
-    private final CountDownLatch socketTransportStarted = new CountDownLatch(1);
-    private BrokerService brokerService;
-
-    @Override
-    public void onMessage(byte[] bytes, int offset, int length) {
-        if (!transportStartedAtLeastOnce()) {
-            LOG.debug("Waiting for StompSocket to be properly started...");
-            try {
-                socketTransportStarted.await();
-            } catch (InterruptedException e) {
-                LOG.warn("While waiting for StompSocket to be properly started, we got interrupted!! Should be okay, but you could see race conditions...");
-            }
-        }
-
-        try {
-            MQTTFrame frame = (MQTTFrame)wireFormat.unmarshal(new ByteSequence(bytes, offset, length));
-            getProtocolConverter().onMQTTCommand(frame);
-        } catch (Exception e) {
-            onException(IOExceptionSupport.create(e));
-        }
-    }
-
-    private MQTTProtocolConverter getProtocolConverter() {
-        if( protocolConverter == null ) {
-            protocolConverter = new MQTTProtocolConverter(this, brokerService);
-        }
-        return protocolConverter;
-    }
-
-    @Override
-    public void onOpen(Connection connection) {
-        this.outbound = connection;
-    }
-
-    @Override
-    public void onClose(int closeCode, String message) {
-        try {
-            getProtocolConverter().onMQTTCommand(new DISCONNECT().encode());
-        } catch (Exception e) {
-            LOG.warn("Failed to close WebSocket", e);
-        }
-    }
-
-    protected void doStart() throws Exception {
-        socketTransportStarted.countDown();
-    }
-
-    @Override
-    protected void doStop(ServiceStopper stopper) throws Exception {
-    }
-
-    private boolean transportStartedAtLeastOnce() {
-        return socketTransportStarted.getCount() == 0;
-    }
-
-    @Override
-    public int getReceiveCounter() {
-        return 0;
-    }
-
-    @Override
-    public String getRemoteAddress() {
-        return "MQTTSocket_" + this.hashCode();
-    }
-
-    @Override
-    public void oneway(Object command) throws IOException {
-        try {
-            getProtocolConverter().onActiveMQCommand((Command) command);
-        } catch (Exception e) {
-            onException(IOExceptionSupport.create(e));
-        }
-    }
-
-    @Override
-    public void sendToActiveMQ(Command command) {
-        doConsume(command);
-    }
-
-    @Override
-    public void sendToMQTT(MQTTFrame command) throws IOException {
-        ByteSequence bytes = wireFormat.marshal(command);
-        outbound.sendMessage(bytes.getData(), 0, bytes.getLength());
-    }
-
-    @Override
-    public X509Certificate[] getPeerCertificates() {
-        return new X509Certificate[0];
-    }
-
-    @Override
-    public MQTTInactivityMonitor getInactivityMonitor() {
-        return null;
-    }
-
-    @Override
-    public MQTTWireFormat getWireFormat() {
-        return wireFormat;
-    }
-
-    @Override
-    public void setBrokerService(BrokerService brokerService) {
-        this.brokerService = brokerService;
-    }
-}

http://git-wip-us.apache.org/repos/asf/activemq/blob/3f826250/activemq-http/src/main/java/org/apache/activemq/transport/ws/StompSocket.java
----------------------------------------------------------------------
diff --git a/activemq-http/src/main/java/org/apache/activemq/transport/ws/StompSocket.java b/activemq-http/src/main/java/org/apache/activemq/transport/ws/StompSocket.java
deleted file mode 100644
index b0da09a..0000000
--- a/activemq-http/src/main/java/org/apache/activemq/transport/ws/StompSocket.java
+++ /dev/null
@@ -1,134 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.transport.ws;
-
-import java.io.IOException;
-import java.util.concurrent.CountDownLatch;
-
-import org.apache.activemq.command.Command;
-import org.apache.activemq.transport.TransportSupport;
-import org.apache.activemq.transport.stomp.ProtocolConverter;
-import org.apache.activemq.transport.stomp.Stomp;
-import org.apache.activemq.transport.stomp.StompFrame;
-import org.apache.activemq.transport.stomp.StompInactivityMonitor;
-import org.apache.activemq.transport.stomp.StompTransport;
-import org.apache.activemq.transport.stomp.StompWireFormat;
-import org.apache.activemq.util.ByteSequence;
-import org.apache.activemq.util.IOExceptionSupport;
-import org.apache.activemq.util.ServiceStopper;
-import org.eclipse.jetty.websocket.WebSocket;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Implements web socket and mediates between servlet and the broker
- */
-class StompSocket extends TransportSupport implements WebSocket.OnTextMessage, StompTransport {
-    private static final Logger LOG = LoggerFactory.getLogger(StompSocket.class);
-
-    Connection outbound;
-    ProtocolConverter protocolConverter = new ProtocolConverter(this, null);
-    StompWireFormat wireFormat = new StompWireFormat();
-    private final CountDownLatch socketTransportStarted = new CountDownLatch(1);
-    private StompInactivityMonitor stompInactivityMonitor = new StompInactivityMonitor(this, wireFormat);
-
-    @Override
-    public void onOpen(Connection connection) {
-        this.outbound = connection;
-    }
-
-    @Override
-    public void onClose(int closeCode, String message) {
-        try {
-            protocolConverter.onStompCommand(new StompFrame(Stomp.Commands.DISCONNECT));
-        } catch (Exception e) {
-            LOG.warn("Failed to close WebSocket", e);
-        }
-    }
-
-    @Override
-    public void onMessage(String data) {
-
-        if (!transportStartedAtLeastOnce()) {
-            LOG.debug("Waiting for StompSocket to be properly started...");
-            try {
-                socketTransportStarted.await();
-            } catch (InterruptedException e) {
-                LOG.warn("While waiting for StompSocket to be properly started, we got interrupted!! Should be okay, but you could see race conditions...");
-            }
-        }
-
-
-        try {
-            protocolConverter.onStompCommand((StompFrame)wireFormat.unmarshal(new ByteSequence(data.getBytes("UTF-8"))));
-        } catch (Exception e) {
-            onException(IOExceptionSupport.create(e));
-        }
-    }
-
-    private boolean transportStartedAtLeastOnce() {
-        return socketTransportStarted.getCount() == 0;
-    }
-
-    @Override
-    protected void doStart() throws Exception {
-        socketTransportStarted.countDown();
-    }
-
-    @Override
-    protected void doStop(ServiceStopper stopper) throws Exception {
-    }
-
-    @Override
-    public int getReceiveCounter() {
-        return 0;
-    }
-
-    @Override
-    public String getRemoteAddress() {
-        return "StompSocket_" + this.hashCode();
-    }
-
-    @Override
-    public void oneway(Object command) throws IOException {
-        try {
-            protocolConverter.onActiveMQCommand((Command)command);
-        } catch (Exception e) {
-            onException(IOExceptionSupport.create(e));
-        }
-    }
-
-    @Override
-    public void sendToActiveMQ(Command command) {
-        doConsume(command);
-    }
-
-    @Override
-    public void sendToStomp(StompFrame command) throws IOException {
-        outbound.sendMessage(command.format());
-    }
-
-    @Override
-    public StompInactivityMonitor getInactivityMonitor() {
-        return stompInactivityMonitor;
-    }
-
-    @Override
-    public StompWireFormat getWireFormat() {
-        return this.wireFormat;
-    }
-}

http://git-wip-us.apache.org/repos/asf/activemq/blob/3f826250/activemq-http/src/main/java/org/apache/activemq/transport/ws/WSServlet.java
----------------------------------------------------------------------
diff --git a/activemq-http/src/main/java/org/apache/activemq/transport/ws/WSServlet.java b/activemq-http/src/main/java/org/apache/activemq/transport/ws/WSServlet.java
deleted file mode 100644
index d0ed22d..0000000
--- a/activemq-http/src/main/java/org/apache/activemq/transport/ws/WSServlet.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.activemq.transport.ws;
-
-import java.io.IOException;
-import javax.servlet.ServletException;
-import javax.servlet.http.HttpServletRequest;
-import javax.servlet.http.HttpServletResponse;
-
-import org.apache.activemq.transport.Transport;
-import org.apache.activemq.transport.TransportAcceptListener;
-import org.eclipse.jetty.websocket.WebSocket;
-import org.eclipse.jetty.websocket.WebSocketServlet;
-
-/**
- * Handle connection upgrade requests and creates web sockets
- */
-public class WSServlet extends WebSocketServlet {
-    private static final long serialVersionUID = -4716657876092884139L;
-
-    private TransportAcceptListener listener;
-
-    public void init() throws ServletException {
-        super.init();
-        listener = (TransportAcceptListener)getServletContext().getAttribute("acceptListener");
-        if (listener == null) {
-            throw new ServletException("No such attribute 'acceptListener' available in the ServletContext");
-        }
-    }
-
-    protected void doGet(HttpServletRequest request, HttpServletResponse response)
-        throws ServletException ,IOException  {
-        getServletContext().getNamedDispatcher("default").forward(request,response);
-    }
-
-    @Override
-    public WebSocket doWebSocketConnect(HttpServletRequest request, String protocol) {
-        WebSocket socket;
-        if (protocol != null && protocol.startsWith("mqtt")) {
-            socket = new MQTTSocket();
-        } else {
-            socket = new StompSocket();
-        }
-        listener.onAccept((Transport)socket);
-        return socket;
-    }
-}

http://git-wip-us.apache.org/repos/asf/activemq/blob/3f826250/activemq-http/src/main/java/org/apache/activemq/transport/ws/WSTransportServer.java
----------------------------------------------------------------------
diff --git a/activemq-http/src/main/java/org/apache/activemq/transport/ws/WSTransportServer.java b/activemq-http/src/main/java/org/apache/activemq/transport/ws/WSTransportServer.java
index 4b75c9a..e26027d 100644
--- a/activemq-http/src/main/java/org/apache/activemq/transport/ws/WSTransportServer.java
+++ b/activemq-http/src/main/java/org/apache/activemq/transport/ws/WSTransportServer.java
@@ -49,7 +49,7 @@ public class WSTransportServer extends WebTransportServerSupport {
 
     @Override
     protected void doStart() throws Exception {
-        server = new Server();
+        createServer();
 
         if (connector == null) {
             connector = socketConnectorFactory.createConnector(server);
@@ -69,7 +69,11 @@ public class WSTransportServer extends WebTransportServerSupport {
             }
         }
 
-        holder.setServlet(new WSServlet());
+        if (Server.getVersion().startsWith("8")) {
+            holder.setServlet(new org.apache.activemq.transport.ws.jetty8.WSServlet());
+        } else {
+            holder.setServlet(new org.apache.activemq.transport.ws.jetty9.WSServlet());
+        }
         contextHandler.addServlet(holder, "/");
 
         contextHandler.setAttribute("acceptListener", getAcceptListener());
@@ -79,9 +83,9 @@ public class WSTransportServer extends WebTransportServerSupport {
         // Update the Connect To URI with our actual location in case the configured port
         // was set to zero so that we report the actual port we are listening on.
 
-        int port = boundTo.getPort();
-        if (connector.getLocalPort() != -1) {
-            port = connector.getLocalPort();
+        int port = getConnectorLocalPort(); 
+        if (port == -1) {
+            port = boundTo.getPort();
         }
 
         setConnectURI(new URI(boundTo.getScheme(),
@@ -95,6 +99,10 @@ public class WSTransportServer extends WebTransportServerSupport {
         LOG.info("Listening for connections at {}", getConnectURI());
     }
 
+    private int getConnectorLocalPort() throws Exception {
+        return (Integer)connector.getClass().getMethod("getLocalPort").invoke(connector);
+    }
+    
     @Override
     protected void doStop(ServiceStopper stopper) throws Exception {
         Server temp = server;

http://git-wip-us.apache.org/repos/asf/activemq/blob/3f826250/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty8/MQTTSocket.java
----------------------------------------------------------------------
diff --git a/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty8/MQTTSocket.java b/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty8/MQTTSocket.java
new file mode 100644
index 0000000..58e9134
--- /dev/null
+++ b/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty8/MQTTSocket.java
@@ -0,0 +1,150 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.ws.jetty8;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.BrokerServiceAware;
+import org.apache.activemq.command.Command;
+import org.apache.activemq.transport.TransportSupport;
+import org.apache.activemq.transport.mqtt.MQTTInactivityMonitor;
+import org.apache.activemq.transport.mqtt.MQTTProtocolConverter;
+import org.apache.activemq.transport.mqtt.MQTTTransport;
+import org.apache.activemq.transport.mqtt.MQTTWireFormat;
+import org.apache.activemq.util.ByteSequence;
+import org.apache.activemq.util.IOExceptionSupport;
+import org.apache.activemq.util.ServiceStopper;
+import org.eclipse.jetty.websocket.WebSocket;
+import org.fusesource.mqtt.codec.DISCONNECT;
+import org.fusesource.mqtt.codec.MQTTFrame;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.security.cert.X509Certificate;
+import java.util.concurrent.CountDownLatch;
+
+public class MQTTSocket  extends TransportSupport implements WebSocket.OnBinaryMessage, MQTTTransport, BrokerServiceAware {
+
+    private static final Logger LOG = LoggerFactory.getLogger(MQTTSocket.class);
+    Connection outbound;
+    MQTTProtocolConverter protocolConverter = null;
+    MQTTWireFormat wireFormat = new MQTTWireFormat();
+    private final CountDownLatch socketTransportStarted = new CountDownLatch(1);
+    private BrokerService brokerService;
+
+    @Override
+    public void onMessage(byte[] bytes, int offset, int length) {
+        if (!transportStartedAtLeastOnce()) {
+            LOG.debug("Waiting for StompSocket to be properly started...");
+            try {
+                socketTransportStarted.await();
+            } catch (InterruptedException e) {
+                LOG.warn("While waiting for StompSocket to be properly started, we got interrupted!! Should be okay, but you could see race conditions...");
+            }
+        }
+
+        try {
+            MQTTFrame frame = (MQTTFrame)wireFormat.unmarshal(new ByteSequence(bytes, offset, length));
+            getProtocolConverter().onMQTTCommand(frame);
+        } catch (Exception e) {
+            onException(IOExceptionSupport.create(e));
+        }
+    }
+
+    private MQTTProtocolConverter getProtocolConverter() {
+        if( protocolConverter == null ) {
+            protocolConverter = new MQTTProtocolConverter(this, brokerService);
+        }
+        return protocolConverter;
+    }
+
+    @Override
+    public void onOpen(Connection connection) {
+        this.outbound = connection;
+    }
+
+    @Override
+    public void onClose(int closeCode, String message) {
+        try {
+            getProtocolConverter().onMQTTCommand(new DISCONNECT().encode());
+        } catch (Exception e) {
+            LOG.warn("Failed to close WebSocket", e);
+        }
+    }
+
+    protected void doStart() throws Exception {
+        socketTransportStarted.countDown();
+    }
+
+    @Override
+    protected void doStop(ServiceStopper stopper) throws Exception {
+    }
+
+    private boolean transportStartedAtLeastOnce() {
+        return socketTransportStarted.getCount() == 0;
+    }
+
+    @Override
+    public int getReceiveCounter() {
+        return 0;
+    }
+
+    @Override
+    public String getRemoteAddress() {
+        return "MQTTSocket_" + this.hashCode();
+    }
+
+    @Override
+    public void oneway(Object command) throws IOException {
+        try {
+            getProtocolConverter().onActiveMQCommand((Command) command);
+        } catch (Exception e) {
+            onException(IOExceptionSupport.create(e));
+        }
+    }
+
+    @Override
+    public void sendToActiveMQ(Command command) {
+        doConsume(command);
+    }
+
+    @Override
+    public void sendToMQTT(MQTTFrame command) throws IOException {
+        ByteSequence bytes = wireFormat.marshal(command);
+        outbound.sendMessage(bytes.getData(), 0, bytes.getLength());
+    }
+
+    @Override
+    public X509Certificate[] getPeerCertificates() {
+        return new X509Certificate[0];
+    }
+
+    @Override
+    public MQTTInactivityMonitor getInactivityMonitor() {
+        return null;
+    }
+
+    @Override
+    public MQTTWireFormat getWireFormat() {
+        return wireFormat;
+    }
+
+    @Override
+    public void setBrokerService(BrokerService brokerService) {
+        this.brokerService = brokerService;
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/3f826250/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty8/StompSocket.java
----------------------------------------------------------------------
diff --git a/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty8/StompSocket.java b/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty8/StompSocket.java
new file mode 100644
index 0000000..dba3ca9
--- /dev/null
+++ b/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty8/StompSocket.java
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.ws.jetty8;
+
+import java.io.IOException;
+import java.util.concurrent.CountDownLatch;
+
+import org.apache.activemq.command.Command;
+import org.apache.activemq.transport.TransportSupport;
+import org.apache.activemq.transport.stomp.ProtocolConverter;
+import org.apache.activemq.transport.stomp.Stomp;
+import org.apache.activemq.transport.stomp.StompFrame;
+import org.apache.activemq.transport.stomp.StompInactivityMonitor;
+import org.apache.activemq.transport.stomp.StompTransport;
+import org.apache.activemq.transport.stomp.StompWireFormat;
+import org.apache.activemq.util.ByteSequence;
+import org.apache.activemq.util.IOExceptionSupport;
+import org.apache.activemq.util.ServiceStopper;
+import org.eclipse.jetty.websocket.WebSocket;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Implements web socket and mediates between servlet and the broker
+ */
+class StompSocket extends TransportSupport implements WebSocket.OnTextMessage, StompTransport {
+    private static final Logger LOG = LoggerFactory.getLogger(StompSocket.class);
+
+    Connection outbound;
+    ProtocolConverter protocolConverter = new ProtocolConverter(this, null);
+    StompWireFormat wireFormat = new StompWireFormat();
+    private final CountDownLatch socketTransportStarted = new CountDownLatch(1);
+    private StompInactivityMonitor stompInactivityMonitor = new StompInactivityMonitor(this, wireFormat);
+
+    @Override
+    public void onOpen(Connection connection) {
+        this.outbound = connection;
+    }
+
+    @Override
+    public void onClose(int closeCode, String message) {
+        try {
+            protocolConverter.onStompCommand(new StompFrame(Stomp.Commands.DISCONNECT));
+        } catch (Exception e) {
+            LOG.warn("Failed to close WebSocket", e);
+        }
+    }
+
+    @Override
+    public void onMessage(String data) {
+
+        if (!transportStartedAtLeastOnce()) {
+            LOG.debug("Waiting for StompSocket to be properly started...");
+            try {
+                socketTransportStarted.await();
+            } catch (InterruptedException e) {
+                LOG.warn("While waiting for StompSocket to be properly started, we got interrupted!! Should be okay, but you could see race conditions...");
+            }
+        }
+
+
+        try {
+            protocolConverter.onStompCommand((StompFrame)wireFormat.unmarshal(new ByteSequence(data.getBytes("UTF-8"))));
+        } catch (Exception e) {
+            onException(IOExceptionSupport.create(e));
+        }
+    }
+
+    private boolean transportStartedAtLeastOnce() {
+        return socketTransportStarted.getCount() == 0;
+    }
+
+    @Override
+    protected void doStart() throws Exception {
+        socketTransportStarted.countDown();
+    }
+
+    @Override
+    protected void doStop(ServiceStopper stopper) throws Exception {
+    }
+
+    @Override
+    public int getReceiveCounter() {
+        return 0;
+    }
+
+    @Override
+    public String getRemoteAddress() {
+        return "StompSocket_" + this.hashCode();
+    }
+
+    @Override
+    public void oneway(Object command) throws IOException {
+        try {
+            protocolConverter.onActiveMQCommand((Command)command);
+        } catch (Exception e) {
+            onException(IOExceptionSupport.create(e));
+        }
+    }
+
+    @Override
+    public void sendToActiveMQ(Command command) {
+        doConsume(command);
+    }
+
+    @Override
+    public void sendToStomp(StompFrame command) throws IOException {
+        outbound.sendMessage(command.format());
+    }
+
+    @Override
+    public StompInactivityMonitor getInactivityMonitor() {
+        return stompInactivityMonitor;
+    }
+
+    @Override
+    public StompWireFormat getWireFormat() {
+        return this.wireFormat;
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/3f826250/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty8/WSServlet.java
----------------------------------------------------------------------
diff --git a/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty8/WSServlet.java b/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty8/WSServlet.java
new file mode 100644
index 0000000..d0f7b19
--- /dev/null
+++ b/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty8/WSServlet.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.transport.ws.jetty8;
+
+import java.io.IOException;
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import org.apache.activemq.transport.Transport;
+import org.apache.activemq.transport.TransportAcceptListener;
+import org.eclipse.jetty.websocket.WebSocket;
+import org.eclipse.jetty.websocket.WebSocketServlet;
+
+/**
+ * Handle connection upgrade requests and creates web sockets
+ */
+public class WSServlet extends WebSocketServlet {
+    private static final long serialVersionUID = -4716657876092884139L;
+
+    private TransportAcceptListener listener;
+
+    public void init() throws ServletException {
+        super.init();
+        listener = (TransportAcceptListener)getServletContext().getAttribute("acceptListener");
+        if (listener == null) {
+            throw new ServletException("No such attribute 'acceptListener' available in the ServletContext");
+        }
+    }
+
+    protected void doGet(HttpServletRequest request, HttpServletResponse response)
+        throws ServletException ,IOException  {
+        getServletContext().getNamedDispatcher("default").forward(request,response);
+    }
+
+    @Override
+    public WebSocket doWebSocketConnect(HttpServletRequest request, String protocol) {
+        WebSocket socket;
+        if (protocol != null && protocol.startsWith("mqtt")) {
+            socket = new MQTTSocket();
+        } else {
+            socket = new StompSocket();
+        }
+        listener.onAccept((Transport)socket);
+        return socket;
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/3f826250/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty9/MQTTSocket.java
----------------------------------------------------------------------
diff --git a/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty9/MQTTSocket.java b/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty9/MQTTSocket.java
new file mode 100644
index 0000000..4d7dac3
--- /dev/null
+++ b/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty9/MQTTSocket.java
@@ -0,0 +1,161 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.ws.jetty9;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.BrokerServiceAware;
+import org.apache.activemq.command.Command;
+import org.apache.activemq.transport.TransportSupport;
+import org.apache.activemq.transport.mqtt.MQTTInactivityMonitor;
+import org.apache.activemq.transport.mqtt.MQTTProtocolConverter;
+import org.apache.activemq.transport.mqtt.MQTTTransport;
+import org.apache.activemq.transport.mqtt.MQTTWireFormat;
+import org.apache.activemq.util.ByteSequence;
+import org.apache.activemq.util.IOExceptionSupport;
+import org.apache.activemq.util.ServiceStopper;
+import org.eclipse.jetty.websocket.api.Session;
+import org.eclipse.jetty.websocket.api.WebSocketListener;
+import org.fusesource.mqtt.codec.DISCONNECT;
+import org.fusesource.mqtt.codec.MQTTFrame;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.security.cert.X509Certificate;
+import java.util.concurrent.CountDownLatch;
+
+public class MQTTSocket  extends TransportSupport implements WebSocketListener, MQTTTransport, BrokerServiceAware {
+
+    private static final Logger LOG = LoggerFactory.getLogger(MQTTSocket.class);
+    Session session;
+    MQTTProtocolConverter protocolConverter = null;
+    MQTTWireFormat wireFormat = new MQTTWireFormat();
+    private final CountDownLatch socketTransportStarted = new CountDownLatch(1);
+    private BrokerService brokerService;
+
+    private MQTTProtocolConverter getProtocolConverter() {
+        if( protocolConverter == null ) {
+            protocolConverter = new MQTTProtocolConverter(this, brokerService);
+        }
+        return protocolConverter;
+    }
+
+    protected void doStart() throws Exception {
+        socketTransportStarted.countDown();
+    }
+
+    @Override
+    protected void doStop(ServiceStopper stopper) throws Exception {
+    }
+
+    private boolean transportStartedAtLeastOnce() {
+        return socketTransportStarted.getCount() == 0;
+    }
+
+    @Override
+    public int getReceiveCounter() {
+        return 0;
+    }
+
+    @Override
+    public String getRemoteAddress() {
+        return "MQTTSocket_" + this.hashCode();
+    }
+
+    @Override
+    public void oneway(Object command) throws IOException {
+        try {
+            getProtocolConverter().onActiveMQCommand((Command) command);
+        } catch (Exception e) {
+            onException(IOExceptionSupport.create(e));
+        }
+    }
+
+    @Override
+    public void sendToActiveMQ(Command command) {
+        doConsume(command);
+    }
+
+    @Override
+    public void sendToMQTT(MQTTFrame command) throws IOException {
+        ByteSequence bytes = wireFormat.marshal(command);
+        session.getRemote().sendBytes(ByteBuffer.wrap(bytes.getData(), 0, bytes.getLength()));
+    }
+
+    @Override
+    public X509Certificate[] getPeerCertificates() {
+        return new X509Certificate[0];
+    }
+
+    @Override
+    public MQTTInactivityMonitor getInactivityMonitor() {
+        return null;
+    }
+
+    @Override
+    public MQTTWireFormat getWireFormat() {
+        return wireFormat;
+    }
+
+    @Override
+    public void setBrokerService(BrokerService brokerService) {
+        this.brokerService = brokerService;
+    }
+
+    @Override
+    public void onWebSocketBinary(byte[] bytes, int offset, int length) {
+        if (!transportStartedAtLeastOnce()) {
+            LOG.debug("Waiting for StompSocket to be properly started...");
+            try {
+                socketTransportStarted.await();
+            } catch (InterruptedException e) {
+                LOG.warn("While waiting for StompSocket to be properly started, we got interrupted!! Should be okay, but you could see race conditions...");
+            }
+        }
+
+        try {
+            MQTTFrame frame = (MQTTFrame)wireFormat.unmarshal(new ByteSequence(bytes, offset, length));
+            getProtocolConverter().onMQTTCommand(frame);
+        } catch (Exception e) {
+            onException(IOExceptionSupport.create(e));
+        }
+    }
+
+    @Override
+    public void onWebSocketClose(int arg0, String arg1) {
+        try {
+            getProtocolConverter().onMQTTCommand(new DISCONNECT().encode());
+        } catch (Exception e) {
+            LOG.warn("Failed to close WebSocket", e);
+        }        
+    }
+
+    @Override
+    public void onWebSocketConnect(Session session) {
+        this.session = session;
+    }
+
+    @Override
+    public void onWebSocketError(Throwable arg0) {
+        
+    }
+
+    @Override
+    public void onWebSocketText(String arg0) {        
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/3f826250/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty9/StompSocket.java
----------------------------------------------------------------------
diff --git a/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty9/StompSocket.java b/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty9/StompSocket.java
new file mode 100644
index 0000000..811f228
--- /dev/null
+++ b/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty9/StompSocket.java
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.ws.jetty9;
+
+import java.io.IOException;
+import java.util.concurrent.CountDownLatch;
+
+import org.apache.activemq.command.Command;
+import org.apache.activemq.transport.TransportSupport;
+import org.apache.activemq.transport.stomp.ProtocolConverter;
+import org.apache.activemq.transport.stomp.Stomp;
+import org.apache.activemq.transport.stomp.StompFrame;
+import org.apache.activemq.transport.stomp.StompInactivityMonitor;
+import org.apache.activemq.transport.stomp.StompTransport;
+import org.apache.activemq.transport.stomp.StompWireFormat;
+import org.apache.activemq.util.ByteSequence;
+import org.apache.activemq.util.IOExceptionSupport;
+import org.apache.activemq.util.ServiceStopper;
+import org.eclipse.jetty.websocket.api.Session;
+import org.eclipse.jetty.websocket.api.WebSocketListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Implements web socket and mediates between servlet and the broker
+ */
+class StompSocket extends TransportSupport implements WebSocketListener, StompTransport {
+    private static final Logger LOG = LoggerFactory.getLogger(StompSocket.class);
+
+    Session session;
+    ProtocolConverter protocolConverter = new ProtocolConverter(this, null);
+    StompWireFormat wireFormat = new StompWireFormat();
+    private final CountDownLatch socketTransportStarted = new CountDownLatch(1);
+    private StompInactivityMonitor stompInactivityMonitor = new StompInactivityMonitor(this, wireFormat);
+
+    private boolean transportStartedAtLeastOnce() {
+        return socketTransportStarted.getCount() == 0;
+    }
+
+    @Override
+    protected void doStart() throws Exception {
+        socketTransportStarted.countDown();
+    }
+
+    @Override
+    protected void doStop(ServiceStopper stopper) throws Exception {
+    }
+
+    @Override
+    public int getReceiveCounter() {
+        return 0;
+    }
+
+    @Override
+    public String getRemoteAddress() {
+        return "StompSocket_" + this.hashCode();
+    }
+
+    @Override
+    public void oneway(Object command) throws IOException {
+        try {
+            protocolConverter.onActiveMQCommand((Command)command);
+        } catch (Exception e) {
+            onException(IOExceptionSupport.create(e));
+        }
+    }
+
+    @Override
+    public void sendToActiveMQ(Command command) {
+        doConsume(command);
+    }
+
+    @Override
+    public void sendToStomp(StompFrame command) throws IOException {
+        session.getRemote().sendString(command.format());
+    }
+
+    @Override
+    public StompInactivityMonitor getInactivityMonitor() {
+        return stompInactivityMonitor;
+    }
+
+    @Override
+    public StompWireFormat getWireFormat() {
+        return this.wireFormat;
+    }
+
+    @Override
+    public void onWebSocketBinary(byte[] arg0, int arg1, int arg2) {
+    }
+
+    @Override
+    public void onWebSocketClose(int arg0, String arg1) {
+        try {
+            protocolConverter.onStompCommand(new StompFrame(Stomp.Commands.DISCONNECT));
+        } catch (Exception e) {
+            LOG.warn("Failed to close WebSocket", e);
+        }
+    }
+
+    @Override
+    public void onWebSocketConnect(Session session) {
+        this.session = session;
+    }
+
+    @Override
+    public void onWebSocketError(Throwable arg0) {       
+    }
+
+    @Override
+    public void onWebSocketText(String data) {
+        if (!transportStartedAtLeastOnce()) {
+            LOG.debug("Waiting for StompSocket to be properly started...");
+            try {
+                socketTransportStarted.await();
+            } catch (InterruptedException e) {
+                LOG.warn("While waiting for StompSocket to be properly started, we got interrupted!! Should be okay, but you could see race conditions...");
+            }
+        }
+
+        try {
+            protocolConverter.onStompCommand((StompFrame)wireFormat.unmarshal(new ByteSequence(data.getBytes("UTF-8"))));
+        } catch (Exception e) {
+            onException(IOExceptionSupport.create(e));
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/3f826250/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty9/WSServlet.java
----------------------------------------------------------------------
diff --git a/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty9/WSServlet.java b/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty9/WSServlet.java
new file mode 100644
index 0000000..15927b1
--- /dev/null
+++ b/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty9/WSServlet.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.transport.ws.jetty9;
+
+import java.io.IOException;
+
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import org.apache.activemq.transport.TransportAcceptListener;
+import org.eclipse.jetty.websocket.api.WebSocketListener;
+import org.eclipse.jetty.websocket.servlet.ServletUpgradeRequest;
+import org.eclipse.jetty.websocket.servlet.ServletUpgradeResponse;
+import org.eclipse.jetty.websocket.servlet.WebSocketServlet;
+import org.eclipse.jetty.websocket.servlet.WebSocketCreator;
+import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory;
+
+/**
+ * Handle connection upgrade requests and creates web sockets
+ */
+public class WSServlet extends WebSocketServlet {
+    private static final long serialVersionUID = -4716657876092884139L;
+
+    private TransportAcceptListener listener;
+
+    public void init() throws ServletException {
+        super.init();
+        listener = (TransportAcceptListener)getServletContext().getAttribute("acceptListener");
+        if (listener == null) {
+            throw new ServletException("No such attribute 'acceptListener' available in the ServletContext");
+        }
+    }
+
+    protected void doGet(HttpServletRequest request, HttpServletResponse response)
+        throws ServletException ,IOException  {
+        getServletContext().getNamedDispatcher("default").forward(request,response);
+    }
+
+    
+    public void configure(WebSocketServletFactory factory) {
+        factory.setCreator(new WebSocketCreator() {
+            @Override
+            public Object createWebSocket(ServletUpgradeRequest req, ServletUpgradeResponse resp) {
+                WebSocketListener socket;
+                if (req.getSubProtocols().contains("mqtt")) {
+                    socket = new MQTTSocket();
+                } else {
+                    socket = new StompSocket();
+                }
+                return socket;
+            }
+        });
+        
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/activemq/blob/3f826250/activemq-http/src/test/java/org/apache/activemq/transport/ws/WSTransportTest.java
----------------------------------------------------------------------
diff --git a/activemq-http/src/test/java/org/apache/activemq/transport/ws/WSTransportTest.java b/activemq-http/src/test/java/org/apache/activemq/transport/ws/WSTransportTest.java
index 92bc1cb..140356e 100644
--- a/activemq-http/src/test/java/org/apache/activemq/transport/ws/WSTransportTest.java
+++ b/activemq-http/src/test/java/org/apache/activemq/transport/ws/WSTransportTest.java
@@ -32,16 +32,18 @@ import javax.net.ServerSocketFactory;
 import org.apache.activemq.broker.BrokerFactory;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.spring.SpringSslContext;
+import org.apache.activemq.transport.SocketConnectorFactory;
 import org.apache.activemq.transport.stomp.StompConnection;
 import org.apache.activemq.util.Wait;
 import org.eclipse.jetty.server.Connector;
 import org.eclipse.jetty.server.Server;
-import org.eclipse.jetty.server.nio.SelectChannelConnector;
 import org.eclipse.jetty.webapp.WebAppContext;
+
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Ignore;
 import org.junit.Test;
+
 import org.openqa.selenium.By;
 import org.openqa.selenium.WebDriver;
 import org.openqa.selenium.WebElement;
@@ -97,7 +99,9 @@ public class WSTransportTest {
         Server server = new Server();
 
         Connector connector = createJettyConnector(server);
-        connector.setServer(server);
+        if (Server.getVersion().startsWith("8")) {
+            connector.setServer(server);
+        }
 
         WebAppContext context = new WebAppContext();
         context.setResourceBase("src/test/webapp");
@@ -129,10 +133,10 @@ public class WSTransportTest {
         return proxyPort;
     }
 
-    protected Connector createJettyConnector(Server server) {
-        SelectChannelConnector connector = new SelectChannelConnector();
-        connector.setPort(getProxyPort());
-        return connector;
+    protected Connector createJettyConnector(Server server) throws Exception {
+        Connector c = new SocketConnectorFactory().createConnector(server);
+        c.getClass().getMethod("setPort", Integer.TYPE).invoke(c, getProxyPort());
+        return c;
     }
 
     protected void stopBroker() throws Exception {

http://git-wip-us.apache.org/repos/asf/activemq/blob/3f826250/activemq-http/src/test/java/org/apache/activemq/transport/wss/WSSTransportTest.java
----------------------------------------------------------------------
diff --git a/activemq-http/src/test/java/org/apache/activemq/transport/wss/WSSTransportTest.java b/activemq-http/src/test/java/org/apache/activemq/transport/wss/WSSTransportTest.java
index ef61140..36b33f6 100644
--- a/activemq-http/src/test/java/org/apache/activemq/transport/wss/WSSTransportTest.java
+++ b/activemq-http/src/test/java/org/apache/activemq/transport/wss/WSSTransportTest.java
@@ -16,23 +16,23 @@
  */
 package org.apache.activemq.transport.wss;
 
+import org.apache.activemq.transport.SecureSocketConnectorFactory;
 import org.apache.activemq.transport.ws.WSTransportTest;
 import org.eclipse.jetty.server.Connector;
 import org.eclipse.jetty.server.Server;
-import org.eclipse.jetty.server.ssl.SslSocketConnector;
-import org.eclipse.jetty.util.ssl.SslContextFactory;
 
 public class WSSTransportTest extends WSTransportTest {
     @Override
-    protected Connector createJettyConnector(Server server) {
-        SslSocketConnector sslConnector = new SslSocketConnector();
-        SslContextFactory contextFactory = sslConnector.getSslContextFactory();
-        contextFactory.setKeyStorePath("src/test/resources/server.keystore");
-        contextFactory.setKeyStorePassword("password");
-        contextFactory.setTrustStore("src/test/resources/client.keystore");
-        contextFactory.setTrustStorePassword("password");
-        sslConnector.setPort(getProxyPort());
-        return sslConnector;
+    protected Connector createJettyConnector(Server server) throws Exception {
+        SecureSocketConnectorFactory sscf = new SecureSocketConnectorFactory();
+        sscf.setKeyStore("src/test/resources/server.keystore");
+        sscf.setKeyStorePassword("password");
+        sscf.setTrustStore("src/test/resources/client.keystore");
+        sscf.setTrustStorePassword("password");
+        
+        Connector c = sscf.createConnector(server);
+        c.getClass().getMethod("setPort", Integer.TYPE).invoke(c, getProxyPort());
+        return c;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/activemq/blob/3f826250/activemq-osgi/pom.xml
----------------------------------------------------------------------
diff --git a/activemq-osgi/pom.xml b/activemq-osgi/pom.xml
index c56877a..74be454 100644
--- a/activemq-osgi/pom.xml
+++ b/activemq-osgi/pom.xml
@@ -52,7 +52,7 @@
       com.fasterxml.jackson*;resolution:=optional,
       org.codehaus.jettison*;resolution:=optional,
       org.jasypt*;resolution:=optional,
-      org.eclipse.jetty*;resolution:=optional,
+      org.eclipse.jetty*;resolution:=optional;version="[8.1,10)",
       org.apache.zookeeper*;resolution:=optional,
       org.fusesource.leveldbjni*;resolution:=optional,
       org.fusesource.hawtjni*;resolution:=optional,

http://git-wip-us.apache.org/repos/asf/activemq/blob/3f826250/activemq-web-console/pom.xml
----------------------------------------------------------------------
diff --git a/activemq-web-console/pom.xml b/activemq-web-console/pom.xml
index c14c40f..8002444 100755
--- a/activemq-web-console/pom.xml
+++ b/activemq-web-console/pom.xml
@@ -32,6 +32,7 @@
 
   <properties>
     <jetty.port>8080</jetty.port>
+    <jetty.maven.groupid>org.mortbay.jetty</jetty.maven.groupid>
   </properties>
 
   <build>
@@ -51,7 +52,7 @@
         </configuration>
       </plugin>
       <plugin>
-        <groupId>org.mortbay.jetty</groupId>
+          <groupId>${jetty.maven.groupid}</groupId>
         <artifactId>jetty-maven-plugin</artifactId>
         <version>${jetty-version}</version>
         <configuration>


Mime
View raw message