activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject [2/3] activemq git commit: https://issues.apache.org/jira/browse/AMQ-6339
Date Thu, 30 Jun 2016 19:14:22 GMT
http://git-wip-us.apache.org/repos/asf/activemq/blob/31c55f75/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/transport/NettyWSTransport.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/transport/NettyWSTransport.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/transport/NettyWSTransport.java
new file mode 100644
index 0000000..1b604fe
--- /dev/null
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/transport/NettyWSTransport.java
@@ -0,0 +1,470 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.client.transport;
+
+import java.io.IOException;
+import java.net.URI;
+import java.security.Principal;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.activemq.transport.amqp.client.util.IOExceptionSupport;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import io.netty.bootstrap.Bootstrap;
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.ChannelPromise;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.FixedRecvByteBufAllocator;
+import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import io.netty.handler.codec.http.DefaultHttpHeaders;
+import io.netty.handler.codec.http.FullHttpResponse;
+import io.netty.handler.codec.http.HttpClientCodec;
+import io.netty.handler.codec.http.HttpObjectAggregator;
+import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame;
+import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
+import io.netty.handler.codec.http.websocketx.PongWebSocketFrame;
+import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
+import io.netty.handler.codec.http.websocketx.WebSocketClientHandshaker;
+import io.netty.handler.codec.http.websocketx.WebSocketClientHandshakerFactory;
+import io.netty.handler.codec.http.websocketx.WebSocketFrame;
+import io.netty.handler.codec.http.websocketx.WebSocketVersion;
+import io.netty.handler.ssl.SslHandler;
+import io.netty.util.CharsetUtil;
+import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.GenericFutureListener;
+
+/**
+ * Transport for communicating over WebSockets
+ */
+public class NettyWSTransport implements NettyTransport {
+
+    private static final Logger LOG = LoggerFactory.getLogger(NettyWSTransport.class);
+
+    private static final int QUIET_PERIOD = 20;
+    private static final int SHUTDOWN_TIMEOUT = 100;
+
+    protected Bootstrap bootstrap;
+    protected EventLoopGroup group;
+    protected Channel channel;
+    protected NettyTransportListener listener;
+    protected NettyTransportOptions options;
+    protected final URI remote;
+    protected boolean secure;
+
+    private final AtomicBoolean connected = new AtomicBoolean();
+    private final AtomicBoolean closed = new AtomicBoolean();
+    private ChannelPromise handshakeFuture;
+    private IOException failureCause;
+    private Throwable pendingFailure;
+
+    /**
+     * Create a new transport instance
+     *
+     * @param remoteLocation
+     *        the URI that defines the remote resource to connect to.
+     * @param options
+     *        the transport options used to configure the socket connection.
+     */
+    public NettyWSTransport(URI remoteLocation, NettyTransportOptions options) {
+        this(null, remoteLocation, options);
+    }
+
+    /**
+     * Create a new transport instance
+     *
+     * @param listener
+     *        the TransportListener that will receive events from this Transport.
+     * @param remoteLocation
+     *        the URI that defines the remote resource to connect to.
+     * @param options
+     *        the transport options used to configure the socket connection.
+     */
+    public NettyWSTransport(NettyTransportListener listener, URI remoteLocation, NettyTransportOptions options) {
+        this.options = options;
+        this.listener = listener;
+        this.remote = remoteLocation;
+        this.secure = remoteLocation.getScheme().equalsIgnoreCase("wss");
+    }
+
+    @Override
+    public void connect() throws IOException {
+
+        if (listener == null) {
+            throw new IllegalStateException("A transport listener must be set before connection attempts.");
+        }
+
+        group = new NioEventLoopGroup(1);
+
+        bootstrap = new Bootstrap();
+        bootstrap.group(group);
+        bootstrap.channel(NioSocketChannel.class);
+        bootstrap.handler(new ChannelInitializer<Channel>() {
+
+            @Override
+            public void initChannel(Channel connectedChannel) throws Exception {
+                configureChannel(connectedChannel);
+            }
+        });
+
+        configureNetty(bootstrap, getTransportOptions());
+
+        ChannelFuture future;
+        try {
+            future = bootstrap.connect(getRemoteHost(), getRemotePort());
+            future.addListener(new ChannelFutureListener() {
+
+                @Override
+                public void operationComplete(ChannelFuture future) throws Exception {
+                    if (future.isSuccess()) {
+                        handleConnected(future.channel());
+                    } else if (future.isCancelled()) {
+                        connectionFailed(future.channel(), new IOException("Connection attempt was cancelled"));
+                    } else {
+                        connectionFailed(future.channel(), IOExceptionSupport.create(future.cause()));
+                    }
+                }
+            });
+
+            future.sync();
+
+            // Now wait for WS protocol level handshake completion
+            handshakeFuture.await();
+        } catch (InterruptedException ex) {
+            LOG.debug("Transport connection attempt was interrupted.");
+            Thread.interrupted();
+            failureCause = IOExceptionSupport.create(ex);
+        }
+
+        if (failureCause != null) {
+            // Close out any Netty resources now as they are no longer needed.
+            if (channel != null) {
+                channel.close().syncUninterruptibly();
+                channel = null;
+            }
+            if (group != null) {
+                group.shutdownGracefully(QUIET_PERIOD, SHUTDOWN_TIMEOUT, TimeUnit.MILLISECONDS);
+                group = null;
+            }
+
+            throw failureCause;
+        } else {
+            // Connected, allow any held async error to fire now and close the transport.
+            channel.eventLoop().execute(new Runnable() {
+
+                @Override
+                public void run() {
+                    if (pendingFailure != null) {
+                        channel.pipeline().fireExceptionCaught(pendingFailure);
+                    }
+                }
+            });
+        }
+    }
+
+    @Override
+    public boolean isConnected() {
+        return connected.get();
+    }
+
+    @Override
+    public boolean isSSL() {
+        return secure;
+    }
+
+    @Override
+    public void close() throws IOException {
+        if (closed.compareAndSet(false, true)) {
+            connected.set(false);
+            if (channel != null) {
+                channel.close().syncUninterruptibly();
+            }
+            if (group != null) {
+                group.shutdownGracefully(QUIET_PERIOD, SHUTDOWN_TIMEOUT, TimeUnit.MILLISECONDS);
+            }
+        }
+    }
+
+    @Override
+    public ByteBuf allocateSendBuffer(int size) throws IOException {
+        checkConnected();
+        return channel.alloc().ioBuffer(size, size);
+    }
+
+    @Override
+    public void send(ByteBuf output) throws IOException {
+        checkConnected();
+        int length = output.readableBytes();
+        if (length == 0) {
+            return;
+        }
+
+        LOG.trace("Attempted write of: {} bytes", length);
+
+        channel.writeAndFlush(new BinaryWebSocketFrame(output));
+    }
+
+    @Override
+    public NettyTransportListener getTransportListener() {
+        return listener;
+    }
+
+    @Override
+    public void setTransportListener(NettyTransportListener listener) {
+        this.listener = listener;
+    }
+
+    @Override
+    public NettyTransportOptions getTransportOptions() {
+        if (options == null) {
+            if (isSSL()) {
+                options = NettyTransportSslOptions.INSTANCE;
+            } else {
+                options = NettyTransportOptions.INSTANCE;
+            }
+        }
+
+        return options;
+    }
+
+    @Override
+    public URI getRemoteLocation() {
+        return remote;
+    }
+
+    @Override
+    public Principal getLocalPrincipal() {
+        if (!isSSL()) {
+            throw new UnsupportedOperationException("Not connected to a secure channel");
+        }
+
+        SslHandler sslHandler = channel.pipeline().get(SslHandler.class);
+
+        return sslHandler.engine().getSession().getLocalPrincipal();
+    }
+
+    //----- Internal implementation details, can be overridden as needed --//
+
+    protected String getRemoteHost() {
+        return remote.getHost();
+    }
+
+    protected int getRemotePort() {
+        int port = remote.getPort();
+
+        if (port <= 0) {
+            if (isSSL()) {
+                port = getSslOptions().getDefaultSslPort();
+            } else {
+                port = getTransportOptions().getDefaultTcpPort();
+            }
+        }
+
+        return port;
+    }
+
+    protected void configureNetty(Bootstrap bootstrap, NettyTransportOptions options) {
+        bootstrap.option(ChannelOption.TCP_NODELAY, options.isTcpNoDelay());
+        bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, options.getConnectTimeout());
+        bootstrap.option(ChannelOption.SO_KEEPALIVE, options.isTcpKeepAlive());
+        bootstrap.option(ChannelOption.SO_LINGER, options.getSoLinger());
+        bootstrap.option(ChannelOption.ALLOCATOR, PartialPooledByteBufAllocator.INSTANCE);
+
+        if (options.getSendBufferSize() != -1) {
+            bootstrap.option(ChannelOption.SO_SNDBUF, options.getSendBufferSize());
+        }
+
+        if (options.getReceiveBufferSize() != -1) {
+            bootstrap.option(ChannelOption.SO_RCVBUF, options.getReceiveBufferSize());
+            bootstrap.option(ChannelOption.RCVBUF_ALLOCATOR, new FixedRecvByteBufAllocator(options.getReceiveBufferSize()));
+        }
+
+        if (options.getTrafficClass() != -1) {
+            bootstrap.option(ChannelOption.IP_TOS, options.getTrafficClass());
+        }
+    }
+
+    protected void configureChannel(final Channel channel) throws Exception {
+        if (isSSL()) {
+            SslHandler sslHandler = NettyTransportSupport.createSslHandler(getRemoteLocation(), getSslOptions());
+            sslHandler.handshakeFuture().addListener(new GenericFutureListener<Future<Channel>>() {
+                @Override
+                public void operationComplete(Future<Channel> future) throws Exception {
+                    if (future.isSuccess()) {
+                        LOG.trace("SSL Handshake has completed: {}", channel);
+                        connectionEstablished(channel);
+                    } else {
+                        LOG.trace("SSL Handshake has failed: {}", channel);
+                        connectionFailed(channel, IOExceptionSupport.create(future.cause()));
+                    }
+                }
+            });
+
+            channel.pipeline().addLast(sslHandler);
+        }
+
+        channel.pipeline().addLast(new HttpClientCodec());
+        channel.pipeline().addLast(new HttpObjectAggregator(8192));
+        channel.pipeline().addLast(new NettyTcpTransportHandler());
+    }
+
+    protected void handleConnected(final Channel channel) throws Exception {
+        if (!isSSL()) {
+            connectionEstablished(channel);
+        }
+    }
+
+    //----- State change handlers and checks ---------------------------------//
+
+    /**
+     * Called when the transport has successfully connected and is ready for use.
+     */
+    protected void connectionEstablished(Channel connectedChannel) {
+        LOG.info("WebSocket connectionEstablished! {}", connectedChannel);
+        channel = connectedChannel;
+        connected.set(true);
+    }
+
+    /**
+     * Called when the transport connection failed and an error should be returned.
+     *
+     * @param failedChannel
+     *      The Channel instance that failed.
+     * @param cause
+     *      An IOException that describes the cause of the failed connection.
+     */
+    protected void connectionFailed(Channel failedChannel, IOException cause) {
+        failureCause = IOExceptionSupport.create(cause);
+        channel = failedChannel;
+        connected.set(false);
+        handshakeFuture.setFailure(cause);
+    }
+
+    private NettyTransportSslOptions getSslOptions() {
+        return (NettyTransportSslOptions) getTransportOptions();
+    }
+
+    private void checkConnected() throws IOException {
+        if (!connected.get()) {
+            throw new IOException("Cannot send to a non-connected transport.");
+        }
+    }
+
+    //----- Handle connection events -----------------------------------------//
+
+    private class NettyTcpTransportHandler extends SimpleChannelInboundHandler<Object> {
+
+        private final WebSocketClientHandshaker handshaker;
+
+        public NettyTcpTransportHandler() {
+            handshaker = WebSocketClientHandshakerFactory.newHandshaker(
+                remote, WebSocketVersion.V13, "amqp", false, new DefaultHttpHeaders());
+        }
+
+        @Override
+        public void handlerAdded(ChannelHandlerContext context) {
+            LOG.trace("Handler has become added! Channel is {}", context.channel());
+            handshakeFuture = context.newPromise();
+        }
+
+        @Override
+        public void channelActive(ChannelHandlerContext context) throws Exception {
+            LOG.trace("Channel has become active! Channel is {}", context.channel());
+            handshaker.handshake(context.channel());
+        }
+
+        @Override
+        public void channelInactive(ChannelHandlerContext context) throws Exception {
+            LOG.trace("Channel has gone inactive! Channel is {}", context.channel());
+            if (connected.compareAndSet(true, false) && !closed.get()) {
+                LOG.trace("Firing onTransportClosed listener");
+                listener.onTransportClosed();
+            }
+        }
+
+        @Override
+        public void exceptionCaught(ChannelHandlerContext context, Throwable cause) throws Exception {
+            LOG.trace("Exception on channel! Channel is {} -> {}", context.channel(), cause.getMessage());
+            LOG.trace("Error Stack: ", cause);
+            if (connected.compareAndSet(true, false) && !closed.get()) {
+                LOG.trace("Firing onTransportError listener");
+                if (pendingFailure != null) {
+                    listener.onTransportError(pendingFailure);
+                } else {
+                    listener.onTransportError(cause);
+                }
+            } else {
+                // Hold the first failure for later dispatch if connect succeeds.
+                // This will then trigger disconnect using the first error reported.
+                if (pendingFailure != null) {
+                    LOG.trace("Holding error until connect succeeds: {}", cause.getMessage());
+                    pendingFailure = cause;
+                }
+
+                if (!handshakeFuture.isDone()) {
+                    handshakeFuture.setFailure(cause);
+                }
+            }
+        }
+
+        @Override
+        protected void channelRead0(ChannelHandlerContext ctx, Object message) throws Exception {
+            LOG.trace("New data read: incoming: {}", message);
+
+            Channel ch = ctx.channel();
+            if (!handshaker.isHandshakeComplete()) {
+                handshaker.finishHandshake(ch, (FullHttpResponse) message);
+                LOG.info("WebSocket Client connected! {}", ctx.channel());
+                handshakeFuture.setSuccess();
+                return;
+            }
+
+            // We shouldn't get this since we handle the handshake previously.
+            if (message instanceof FullHttpResponse) {
+                FullHttpResponse response = (FullHttpResponse) message;
+                throw new IllegalStateException(
+                    "Unexpected FullHttpResponse (getStatus=" + response.getStatus() +
+                    ", content=" + response.content().toString(CharsetUtil.UTF_8) + ')');
+            }
+
+            WebSocketFrame frame = (WebSocketFrame) message;
+            if (frame instanceof TextWebSocketFrame) {
+                TextWebSocketFrame textFrame = (TextWebSocketFrame) frame;
+                LOG.warn("WebSocket Client received message: " + textFrame.text());
+                ctx.fireExceptionCaught(new IOException("Received invalid frame over WebSocket."));
+            } else if (frame instanceof BinaryWebSocketFrame) {
+                BinaryWebSocketFrame binaryFrame = (BinaryWebSocketFrame) frame;
+                LOG.info("WebSocket Client received data: {} bytes", binaryFrame.content().readableBytes());
+                listener.onData(binaryFrame.content());
+            } else if (frame instanceof PongWebSocketFrame) {
+                LOG.trace("WebSocket Client received pong");
+            } else if (frame instanceof CloseWebSocketFrame) {
+                LOG.trace("WebSocket Client received closing");
+                ch.close();
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/31c55f75/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpBrokerReuqestedHearbeatsTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpBrokerReuqestedHearbeatsTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpBrokerReuqestedHearbeatsTest.java
index 75e30f9..e9d50b7 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpBrokerReuqestedHearbeatsTest.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpBrokerReuqestedHearbeatsTest.java
@@ -21,6 +21,8 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 
+import java.util.Arrays;
+import java.util.Collection;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
@@ -32,14 +34,30 @@ import org.apache.activemq.transport.amqp.client.AmqpValidator;
 import org.apache.activemq.util.Wait;
 import org.apache.qpid.proton.engine.Connection;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
 
 /**
  * Test handling of heartbeats requested by the broker.
  */
+@RunWith(Parameterized.class)
 public class AmqpBrokerReuqestedHearbeatsTest extends AmqpClientTestSupport {
 
     private final int TEST_IDLE_TIMEOUT = 3000;
 
+    @Parameters(name="connector={0}")
+    public static Collection<Object[]> data() {
+        return Arrays.asList(new Object[][] {
+            {"amqp", false},
+            {"amqp+ws", false},
+        });
+    }
+
+    public AmqpBrokerReuqestedHearbeatsTest(String connectorScheme, boolean secure) {
+        super(connectorScheme, secure);
+    }
+
     @Override
     protected String getAdditionalConfig() {
         return "&transport.wireFormat.idleTimeout=" + TEST_IDLE_TIMEOUT;

http://git-wip-us.apache.org/repos/asf/activemq/blob/31c55f75/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpClientRequestsHeartbeatsTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpClientRequestsHeartbeatsTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpClientRequestsHeartbeatsTest.java
index c7ab0cd..3c779a2 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpClientRequestsHeartbeatsTest.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpClientRequestsHeartbeatsTest.java
@@ -21,6 +21,8 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 
+import java.util.Arrays;
+import java.util.Collection;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
@@ -32,14 +34,30 @@ import org.apache.activemq.transport.amqp.client.AmqpValidator;
 import org.apache.activemq.util.Wait;
 import org.apache.qpid.proton.engine.Connection;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
 
 /**
  * Tests that cover broker behavior when the client requests heartbeats
  */
+@RunWith(Parameterized.class)
 public class AmqpClientRequestsHeartbeatsTest extends AmqpClientTestSupport {
 
     private final int TEST_IDLE_TIMEOUT = 3000;
 
+    @Parameters(name="connector={0}")
+    public static Collection<Object[]> data() {
+        return Arrays.asList(new Object[][] {
+            {"amqp", false},
+            {"amqp+ws", false},
+        });
+    }
+
+    public AmqpClientRequestsHeartbeatsTest(String connectorScheme, boolean secure) {
+        super(connectorScheme, secure);
+    }
+
     @Override
     protected String getAdditionalConfig() {
         return "&transport.wireFormat.idleTimeout=0";

http://git-wip-us.apache.org/repos/asf/activemq/blob/31c55f75/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpConnectionsTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpConnectionsTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpConnectionsTest.java
index fa519ab..2d154e6 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpConnectionsTest.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpConnectionsTest.java
@@ -25,6 +25,8 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.fail;
 
+import java.util.Arrays;
+import java.util.Collection;
 import java.util.Map;
 
 import org.apache.activemq.transport.amqp.AmqpSupport;
@@ -37,16 +39,34 @@ import org.apache.qpid.proton.amqp.transport.AmqpError;
 import org.apache.qpid.proton.amqp.transport.ErrorCondition;
 import org.apache.qpid.proton.engine.Connection;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
 
 /**
  * Test broker handling of AMQP connections with various configurations.
  */
+@RunWith(Parameterized.class)
 public class AmqpConnectionsTest extends AmqpClientTestSupport {
 
     private static final Symbol QUEUE_PREFIX = Symbol.valueOf("queue-prefix");
     private static final Symbol TOPIC_PREFIX = Symbol.valueOf("topic-prefix");
     private static final Symbol ANONYMOUS_RELAY = Symbol.valueOf("ANONYMOUS-RELAY");
 
+    @Parameters(name="{0}")
+    public static Collection<Object[]> data() {
+        return Arrays.asList(new Object[][] {
+            {"amqp", false},
+            {"amqp+ws", false},
+            {"amqp+ssl", true},
+            {"amqp+wss", true}
+        });
+    }
+
+    public AmqpConnectionsTest(String connectorScheme, boolean secure) {
+        super(connectorScheme, secure);
+    }
+
     @Test(timeout = 60000)
     public void testCanConnect() throws Exception {
         AmqpClient client = createAmqpClient();

http://git-wip-us.apache.org/repos/asf/activemq/blob/31c55f75/activemq-amqp/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/resources/log4j.properties b/activemq-amqp/src/test/resources/log4j.properties
index d25017d..f88b152 100755
--- a/activemq-amqp/src/test/resources/log4j.properties
+++ b/activemq-amqp/src/test/resources/log4j.properties
@@ -20,7 +20,7 @@
 #
 log4j.rootLogger=WARN, console, file
 log4j.logger.org.apache.activemq=INFO
-log4j.logger.org.apache.activemq.transport.amqp=DEBUG
+log4j.logger.org.apache.activemq.transport.amqp=TRACE
 log4j.logger.org.apache.activemq.transport.amqp.FRAMES=INFO
 log4j.logger.org.fusesource=INFO
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/31c55f75/activemq-broker/src/main/java/org/apache/activemq/transport/vm/VMTransport.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/transport/vm/VMTransport.java b/activemq-broker/src/main/java/org/apache/activemq/transport/vm/VMTransport.java
index 9e13cf9..5f75a3c 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/transport/vm/VMTransport.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/transport/vm/VMTransport.java
@@ -19,6 +19,7 @@ package org.apache.activemq.transport.vm;
 import java.io.IOException;
 import java.io.InterruptedIOException;
 import java.net.URI;
+import java.security.cert.X509Certificate;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
@@ -34,6 +35,7 @@ import org.apache.activemq.transport.ResponseCallback;
 import org.apache.activemq.transport.Transport;
 import org.apache.activemq.transport.TransportDisposedIOException;
 import org.apache.activemq.transport.TransportListener;
+import org.apache.activemq.wireformat.WireFormat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -438,4 +440,19 @@ public class VMTransport implements Transport, Task {
     public int getReceiveCounter() {
         return receiveCounter;
     }
+
+    @Override
+    public X509Certificate[] getPeerCertificates() {
+        return null;
+    }
+
+    @Override
+    public void setPeerCertificates(X509Certificate[] certificates) {
+
+    }
+
+    @Override
+    public WireFormat getWireFormat() {
+        return null;
+    }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/31c55f75/activemq-client/src/main/java/org/apache/activemq/transport/Transport.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/transport/Transport.java b/activemq-client/src/main/java/org/apache/activemq/transport/Transport.java
index d24596a..2067c14 100755
--- a/activemq-client/src/main/java/org/apache/activemq/transport/Transport.java
+++ b/activemq-client/src/main/java/org/apache/activemq/transport/Transport.java
@@ -18,7 +18,10 @@ package org.apache.activemq.transport;
 
 import java.io.IOException;
 import java.net.URI;
+import java.security.cert.X509Certificate;
+
 import org.apache.activemq.Service;
+import org.apache.activemq.wireformat.WireFormat;
 
 /**
  * Represents the client side of a transport allowing messages to be sent
@@ -116,6 +119,7 @@ public interface Transport extends Service {
      * @return true if updating uris is supported
      */
     boolean isUpdateURIsSupported();
+
     /**
      * reconnect to another location
      * @param uri
@@ -139,4 +143,25 @@ public interface Transport extends Service {
      * @return a counter which gets incremented as data is read from the transport.
      */
     int getReceiveCounter();
+
+    /**
+     * @return the Certificates provided by the peer, or null if not a secure channel.
+     */
+    X509Certificate[] getPeerCertificates();
+
+    /**
+     * Sets the certificates provided by the connected peer.
+     *
+     * @param certificates
+     *      the Certificates provided by the peer.
+     */
+    void setPeerCertificates(X509Certificate[] certificates);
+
+    /**
+     * Retrieves the WireFormat instance associated with this Transport instance.
+     *
+     * @return the WireFormat in use.
+     */
+    WireFormat getWireFormat();
+
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/31c55f75/activemq-client/src/main/java/org/apache/activemq/transport/TransportFilter.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/transport/TransportFilter.java b/activemq-client/src/main/java/org/apache/activemq/transport/TransportFilter.java
index b0fafe8..ce02a7a 100755
--- a/activemq-client/src/main/java/org/apache/activemq/transport/TransportFilter.java
+++ b/activemq-client/src/main/java/org/apache/activemq/transport/TransportFilter.java
@@ -18,9 +18,12 @@ package org.apache.activemq.transport;
 
 import java.io.IOException;
 import java.net.URI;
+import java.security.cert.X509Certificate;
+
+import org.apache.activemq.wireformat.WireFormat;
 
 /**
- * 
+ *
  */
 public class TransportFilter implements TransportListener, Transport {
     protected final Transport next;
@@ -30,10 +33,12 @@ public class TransportFilter implements TransportListener, Transport {
         this.next = next;
     }
 
+    @Override
     public TransportListener getTransportListener() {
         return transportListener;
     }
 
+    @Override
     public void setTransportListener(TransportListener channelListener) {
         this.transportListener = channelListener;
         if (channelListener == null) {
@@ -48,6 +53,7 @@ public class TransportFilter implements TransportListener, Transport {
      * @throws IOException
      *             if the next channel has not been set.
      */
+    @Override
     public void start() throws Exception {
         if (next == null) {
             throw new IOException("The next channel has not been set.");
@@ -61,10 +67,12 @@ public class TransportFilter implements TransportListener, Transport {
     /**
      * @see org.apache.activemq.Service#stop()
      */
+    @Override
     public void stop() throws Exception {
         next.stop();
     }
 
+    @Override
     public void onCommand(Object command) {
         transportListener.onCommand(command);
     }
@@ -81,34 +89,42 @@ public class TransportFilter implements TransportListener, Transport {
         return next.toString();
     }
 
+    @Override
     public void oneway(Object command) throws IOException {
         next.oneway(command);
     }
 
+    @Override
     public FutureResponse asyncRequest(Object command, ResponseCallback responseCallback) throws IOException {
         return next.asyncRequest(command, null);
     }
 
+    @Override
     public Object request(Object command) throws IOException {
         return next.request(command);
     }
 
+    @Override
     public Object request(Object command, int timeout) throws IOException {
         return next.request(command, timeout);
     }
 
+    @Override
     public void onException(IOException error) {
         transportListener.onException(error);
     }
 
+    @Override
     public void transportInterupted() {
         transportListener.transportInterupted();
     }
 
+    @Override
     public void transportResumed() {
         transportListener.transportResumed();
     }
 
+    @Override
     public <T> T narrow(Class<T> target) {
         if (target.isAssignableFrom(getClass())) {
             return target.cast(this);
@@ -116,6 +132,7 @@ public class TransportFilter implements TransportListener, Transport {
         return next.narrow(target);
     }
 
+    @Override
     public String getRemoteAddress() {
         return next.getRemoteAddress();
     }
@@ -124,35 +141,58 @@ public class TransportFilter implements TransportListener, Transport {
      * @return
      * @see org.apache.activemq.transport.Transport#isFaultTolerant()
      */
+    @Override
     public boolean isFaultTolerant() {
         return next.isFaultTolerant();
     }
 
+    @Override
     public boolean isDisposed() {
         return next.isDisposed();
     }
 
+    @Override
     public boolean isConnected() {
         return next.isConnected();
     }
 
+    @Override
     public void reconnect(URI uri) throws IOException {
         next.reconnect(uri);
     }
 
+    @Override
     public int getReceiveCounter() {
         return next.getReceiveCounter();
     }
 
+    @Override
     public boolean isReconnectSupported() {
         return next.isReconnectSupported();
     }
 
+    @Override
     public boolean isUpdateURIsSupported() {
         return next.isUpdateURIsSupported();
     }
 
+    @Override
     public void updateURIs(boolean rebalance,URI[] uris) throws IOException {
         next.updateURIs(rebalance,uris);
     }
+
+    @Override
+    public X509Certificate[] getPeerCertificates() {
+        return next.getPeerCertificates();
+    }
+
+    @Override
+    public void setPeerCertificates(X509Certificate[] certificates) {
+        next.setPeerCertificates(certificates);
+    }
+
+    @Override
+    public WireFormat getWireFormat() {
+        return next.getWireFormat();
+    }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/31c55f75/activemq-client/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java b/activemq-client/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
index f502179..a46b318 100755
--- a/activemq-client/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
+++ b/activemq-client/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -26,6 +26,7 @@ import java.net.MalformedURLException;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.net.URL;
+import java.security.cert.X509Certificate;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashSet;
@@ -61,6 +62,7 @@ import org.apache.activemq.transport.TransportListener;
 import org.apache.activemq.util.IOExceptionSupport;
 import org.apache.activemq.util.ServiceSupport;
 import org.apache.activemq.util.URISupport;
+import org.apache.activemq.wireformat.WireFormat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -1448,4 +1450,28 @@ public class FailoverTransport implements CompositeTransport {
     public void setWarnAfterReconnectAttempts(int warnAfterReconnectAttempts) {
         this.warnAfterReconnectAttempts = warnAfterReconnectAttempts;
     }
+
+    @Override
+    public X509Certificate[] getPeerCertificates() {
+        Transport transport = connectedTransport.get();
+        if (transport != null) {
+            return transport.getPeerCertificates();
+        } else {
+            return null;
+        }
+    }
+
+    @Override
+    public void setPeerCertificates(X509Certificate[] certificates) {
+    }
+
+    @Override
+    public WireFormat getWireFormat() {
+        Transport transport = connectedTransport.get();
+        if (transport != null) {
+            return transport.getWireFormat();
+        } else {
+            return null;
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/31c55f75/activemq-client/src/main/java/org/apache/activemq/transport/fanout/FanoutTransport.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/transport/fanout/FanoutTransport.java b/activemq-client/src/main/java/org/apache/activemq/transport/fanout/FanoutTransport.java
index 00ae7ae..d7f4f85 100755
--- a/activemq-client/src/main/java/org/apache/activemq/transport/fanout/FanoutTransport.java
+++ b/activemq-client/src/main/java/org/apache/activemq/transport/fanout/FanoutTransport.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -19,6 +19,7 @@ package org.apache.activemq.transport.fanout;
 import java.io.IOException;
 import java.io.InterruptedIOException;
 import java.net.URI;
+import java.security.cert.X509Certificate;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.concurrent.ConcurrentHashMap;
@@ -44,13 +45,12 @@ import org.apache.activemq.transport.TransportListener;
 import org.apache.activemq.util.IOExceptionSupport;
 import org.apache.activemq.util.ServiceStopper;
 import org.apache.activemq.util.ServiceSupport;
+import org.apache.activemq.wireformat.WireFormat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
  * A Transport that fans out a connection to multiple brokers.
- *
- *
  */
 public class FanoutTransport implements CompositeTransport {
 
@@ -113,9 +113,9 @@ public class FanoutTransport implements CompositeTransport {
 
         @Override
         public void onCommand(Object o) {
-            Command command = (Command)o;
+            Command command = (Command) o;
             if (command.isResponse()) {
-                Integer id = new Integer(((Response)command).getCorrelationId());
+                Integer id = new Integer(((Response) command).getCorrelationId());
                 RequestCounter rc = requestMap.get(id);
                 if (rc != null) {
                     if (rc.ackCount.decrementAndGet() <= 0) {
@@ -191,7 +191,7 @@ public class FanoutTransport implements CompositeTransport {
 
                     // Try to connect them up.
                     Iterator<FanoutTransportHandler> iter = transports.iterator();
-                    for (int i = 0; iter.hasNext() && !disposed; i++) {
+                    while (iter.hasNext() && !disposed) {
 
                         long now = System.currentTimeMillis();
 
@@ -228,9 +228,9 @@ public class FanoutTransport implements CompositeTransport {
                         } catch (Exception e) {
                             LOG.debug("Connect fail to: " + uri + ", reason: " + e);
 
-                            if( fanoutHandler.transport !=null ) {
+                            if (fanoutHandler.transport != null) {
                                 ServiceSupport.dispose(fanoutHandler.transport);
-                                fanoutHandler.transport=null;
+                                fanoutHandler.transport = null;
                             }
 
                             if (maxReconnectAttempts > 0 && ++fanoutHandler.connectFailures >= maxReconnectAttempts) {
@@ -256,14 +256,13 @@ public class FanoutTransport implements CompositeTransport {
                             }
                         }
                     }
+
                     if (transports.size() == connectedCount || disposed) {
                         reconnectMutex.notifyAll();
                         return false;
                     }
-
                 }
             }
-
         }
 
         try {
@@ -292,7 +291,7 @@ public class FanoutTransport implements CompositeTransport {
                     restoreTransport(th);
                 }
             }
-            connected=true;
+            connected = true;
         }
     }
 
@@ -307,7 +306,7 @@ public class FanoutTransport implements CompositeTransport {
                 }
                 started = false;
                 disposed = true;
-                connected=false;
+                connected = false;
 
                 for (Iterator<FanoutTransportHandler> iter = transports.iterator(); iter.hasNext();) {
                     FanoutTransportHandler th = iter.next();
@@ -367,7 +366,7 @@ public class FanoutTransport implements CompositeTransport {
 
     @Override
     public void oneway(Object o) throws IOException {
-        final Command command = (Command)o;
+        final Command command = (Command) o;
         try {
             synchronized (reconnectMutex) {
 
@@ -392,7 +391,7 @@ public class FanoutTransport implements CompositeTransport {
                     }
 
                     if (error instanceof IOException) {
-                        throw (IOException)error;
+                        throw (IOException) error;
                     }
                     throw IOExceptionSupport.create(error);
                 }
@@ -428,7 +427,6 @@ public class FanoutTransport implements CompositeTransport {
                         primary.onException(e);
                     }
                 }
-
             }
         } catch (InterruptedException e) {
             // Some one may be trying to stop our thread.
@@ -443,13 +441,12 @@ public class FanoutTransport implements CompositeTransport {
      */
     private boolean isFanoutCommand(Command command) {
         if (command.isMessage()) {
-            if( fanOutQueues ) {
+            if (fanOutQueues) {
                 return true;
             }
-            return ((Message)command).getDestination().isTopic();
+            return ((Message) command).getDestination().isTopic();
         }
-        if (command.getDataStructureType() == ConsumerInfo.DATA_STRUCTURE_TYPE ||
-                command.getDataStructureType() == RemoveInfo.DATA_STRUCTURE_TYPE) {
+        if (command.getDataStructureType() == ConsumerInfo.DATA_STRUCTURE_TYPE || command.getDataStructureType() == RemoveInfo.DATA_STRUCTURE_TYPE) {
             return false;
         }
         return true;
@@ -491,7 +488,6 @@ public class FanoutTransport implements CompositeTransport {
 
     @Override
     public <T> T narrow(Class<T> target) {
-
         if (target.isAssignableFrom(getClass())) {
             return target.cast(this);
         }
@@ -509,7 +505,6 @@ public class FanoutTransport implements CompositeTransport {
         }
 
         return null;
-
     }
 
     protected void restoreTransport(FanoutTransportHandler th) throws Exception, IOException {
@@ -523,8 +518,7 @@ public class FanoutTransport implements CompositeTransport {
     }
 
     @Override
-    public void add(boolean reblance,URI uris[]) {
-
+    public void add(boolean reblance, URI uris[]) {
         synchronized (reconnectMutex) {
             for (int i = 0; i < uris.length; i++) {
                 URI uri = uris[i];
@@ -537,6 +531,7 @@ public class FanoutTransport implements CompositeTransport {
                         break;
                     }
                 }
+
                 if (!match) {
                     FanoutTransportHandler th = new FanoutTransportHandler(uri);
                     transports.add(th);
@@ -544,12 +539,10 @@ public class FanoutTransport implements CompositeTransport {
                 }
             }
         }
-
     }
 
     @Override
-    public void remove(boolean rebalance,URI uris[]) {
-
+    public void remove(boolean rebalance, URI uris[]) {
         synchronized (reconnectMutex) {
             for (int i = 0; i < uris.length; i++) {
                 URI uri = uris[i];
@@ -567,13 +560,11 @@ public class FanoutTransport implements CompositeTransport {
                 }
             }
         }
-
     }
 
     @Override
     public void reconnect(URI uri) throws IOException {
-        add(true,new URI[]{uri});
-
+        add(true, new URI[] { uri });
     }
 
     @Override
@@ -585,12 +576,12 @@ public class FanoutTransport implements CompositeTransport {
     public boolean isUpdateURIsSupported() {
         return true;
     }
+
     @Override
-    public void updateURIs(boolean reblance,URI[] uris) throws IOException {
-        add(reblance,uris);
+    public void updateURIs(boolean reblance, URI[] uris) throws IOException {
+        add(reblance, uris);
     }
 
-
     @Override
     public String getRemoteAddress() {
         if (primary != null) {
@@ -625,7 +616,6 @@ public class FanoutTransport implements CompositeTransport {
         return disposed;
     }
 
-
     @Override
     public boolean isConnected() {
         return connected;
@@ -643,4 +633,19 @@ public class FanoutTransport implements CompositeTransport {
         }
         return rc;
     }
+
+    @Override
+    public X509Certificate[] getPeerCertificates() {
+        return null;
+    }
+
+    @Override
+    public void setPeerCertificates(X509Certificate[] certificates) {
+
+    }
+
+    @Override
+    public WireFormat getWireFormat() {
+        return null;
+    }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/31c55f75/activemq-client/src/main/java/org/apache/activemq/transport/mock/MockTransport.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/transport/mock/MockTransport.java b/activemq-client/src/main/java/org/apache/activemq/transport/mock/MockTransport.java
index 60c94af..8b00e27 100644
--- a/activemq-client/src/main/java/org/apache/activemq/transport/mock/MockTransport.java
+++ b/activemq-client/src/main/java/org/apache/activemq/transport/mock/MockTransport.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -18,16 +18,16 @@ package org.apache.activemq.transport.mock;
 
 import java.io.IOException;
 import java.net.URI;
+import java.security.cert.X509Certificate;
+
 import org.apache.activemq.transport.DefaultTransportListener;
 import org.apache.activemq.transport.FutureResponse;
 import org.apache.activemq.transport.ResponseCallback;
 import org.apache.activemq.transport.Transport;
 import org.apache.activemq.transport.TransportFilter;
 import org.apache.activemq.transport.TransportListener;
+import org.apache.activemq.wireformat.WireFormat;
 
-/**
- * 
- */
 public class MockTransport extends DefaultTransportListener implements Transport {
 
     protected Transport next;
@@ -37,8 +37,7 @@ public class MockTransport extends DefaultTransportListener implements Transport
         this.next = next;
     }
 
-    /**
-     */
+    @Override
     public synchronized void setTransportListener(TransportListener channelListener) {
         this.transportListener = channelListener;
         if (channelListener == null) {
@@ -50,8 +49,10 @@ public class MockTransport extends DefaultTransportListener implements Transport
 
     /**
      * @see org.apache.activemq.Service#start()
-     * @throws IOException if the next channel has not been set.
+     * @throws IOException
+     *         if the next channel has not been set.
      */
+    @Override
     public void start() throws Exception {
         if (getNext() == null) {
             throw new IOException("The next channel has not been set.");
@@ -65,6 +66,7 @@ public class MockTransport extends DefaultTransportListener implements Transport
     /**
      * @see org.apache.activemq.Service#stop()
      */
+    @Override
     public void stop() throws Exception {
         getNext().stop();
     }
@@ -84,6 +86,7 @@ public class MockTransport extends DefaultTransportListener implements Transport
     /**
      * @return Returns the packetListener.
      */
+    @Override
     public synchronized TransportListener getTransportListener() {
         return transportListener;
     }
@@ -93,18 +96,22 @@ public class MockTransport extends DefaultTransportListener implements Transport
         return getNext().toString();
     }
 
+    @Override
     public void oneway(Object command) throws IOException {
         getNext().oneway(command);
     }
 
+    @Override
     public FutureResponse asyncRequest(Object command, ResponseCallback responseCallback) throws IOException {
         return getNext().asyncRequest(command, null);
     }
 
+    @Override
     public Object request(Object command) throws IOException {
         return getNext().request(command);
     }
 
+    @Override
     public Object request(Object command, int timeout) throws IOException {
         return getNext().request(command, timeout);
     }
@@ -114,6 +121,7 @@ public class MockTransport extends DefaultTransportListener implements Transport
         getTransportListener().onException(error);
     }
 
+    @Override
     public <T> T narrow(Class<T> target) {
         if (target.isAssignableFrom(getClass())) {
             return target.cast(this);
@@ -131,6 +139,7 @@ public class MockTransport extends DefaultTransportListener implements Transport
         setNext(filter);
     }
 
+    @Override
     public String getRemoteAddress() {
         return getNext().getRemoteAddress();
     }
@@ -138,35 +147,58 @@ public class MockTransport extends DefaultTransportListener implements Transport
     /**
      * @see org.apache.activemq.transport.Transport#isFaultTolerant()
      */
+    @Override
     public boolean isFaultTolerant() {
         return getNext().isFaultTolerant();
     }
 
-	public boolean isDisposed() {
-		return getNext().isDisposed();
-	}
-	
-	public boolean isConnected() {
-       return getNext().isConnected();
+    @Override
+    public boolean isDisposed() {
+        return getNext().isDisposed();
     }
 
-	public void reconnect(URI uri) throws IOException {
-		getNext().reconnect(uri);
-	}
+    @Override
+    public boolean isConnected() {
+        return getNext().isConnected();
+    }
 
+    @Override
+    public void reconnect(URI uri) throws IOException {
+        getNext().reconnect(uri);
+    }
+
+    @Override
     public int getReceiveCounter() {
         return getNext().getReceiveCounter();
     }
-    
 
+    @Override
     public boolean isReconnectSupported() {
         return getNext().isReconnectSupported();
     }
 
+    @Override
     public boolean isUpdateURIsSupported() {
         return getNext().isUpdateURIsSupported();
     }
-    public void updateURIs(boolean reblance,URI[] uris) throws IOException {
-       getNext().updateURIs(reblance,uris);
+
+    @Override
+    public void updateURIs(boolean reblance, URI[] uris) throws IOException {
+        getNext().updateURIs(reblance, uris);
+    }
+
+    @Override
+    public X509Certificate[] getPeerCertificates() {
+        return getNext().getPeerCertificates();
+    }
+
+    @Override
+    public void setPeerCertificates(X509Certificate[] certificates) {
+        getNext().setPeerCertificates(certificates);
+    }
+
+    @Override
+    public WireFormat getWireFormat() {
+        return getNext().getWireFormat();
     }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/31c55f75/activemq-client/src/main/java/org/apache/activemq/transport/tcp/SslTransport.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/transport/tcp/SslTransport.java b/activemq-client/src/main/java/org/apache/activemq/transport/tcp/SslTransport.java
index 2b3953f..0c2fab9 100644
--- a/activemq-client/src/main/java/org/apache/activemq/transport/tcp/SslTransport.java
+++ b/activemq-client/src/main/java/org/apache/activemq/transport/tcp/SslTransport.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -14,7 +14,6 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.activemq.transport.tcp;
 
 import java.io.IOException;
@@ -43,6 +42,7 @@ import org.apache.activemq.wireformat.WireFormat;
  * unexpected situations may occur.
  */
 public class SslTransport extends TcpTransport {
+
     /**
      * Connect to a remote node such as a Broker.
      *
@@ -56,6 +56,7 @@ public class SslTransport extends TcpTransport {
      * @throws UnknownHostException If TcpTransport throws.
      * @throws IOException If TcpTransport throws.
      */
+    @SuppressWarnings({ "unchecked", "rawtypes" })
     public SslTransport(WireFormat wireFormat, SSLSocketFactory socketFactory, URI remoteLocation, URI localLocation, boolean needClientAuth) throws IOException {
         super(wireFormat, socketFactory, remoteLocation, localLocation);
         if (this.socket != null) {
@@ -65,7 +66,7 @@ public class SslTransport extends TcpTransport {
             // a single proxy to route to different messaging apps.
 
             // On java 1.7 it seems like it can only be configured via reflection.
-            // todo: find out if this will work on java 1.8
+            // TODO: find out if this will work on java 1.8
             HashMap props = new HashMap();
             props.put("host", remoteLocation.getHost());
             IntrospectionSupport.setProperties(this.socket, props);
@@ -110,6 +111,7 @@ public class SslTransport extends TcpTransport {
     /**
      * @return peer certificate chain associated with the ssl socket
      */
+    @Override
     public X509Certificate[] getPeerCertificates() {
 
         SSLSocket sslSocket = (SSLSocket)this.socket;
@@ -120,7 +122,7 @@ public class SslTransport extends TcpTransport {
         try {
             clientCertChain = (X509Certificate[])sslSession.getPeerCertificates();
         } catch (SSLPeerUnverifiedException e) {
-        	clientCertChain = null;
+            clientCertChain = null;
         }
 
         return clientCertChain;
@@ -133,5 +135,4 @@ public class SslTransport extends TcpTransport {
     public String toString() {
         return "ssl://" + socket.getInetAddress() + ":" + socket.getPort();
     }
-
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/31c55f75/activemq-client/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java b/activemq-client/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java
index 60fe283..04d1636 100755
--- a/activemq-client/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java
+++ b/activemq-client/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java
@@ -1,5 +1,5 @@
-/**
-gxfdgvdfg * Licensed to the Apache Software Foundation (ASF) under one or more
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
  * The ASF licenses this file to You under the Apache License, Version 2.0
@@ -29,6 +29,7 @@ import java.net.SocketTimeoutException;
 import java.net.URI;
 import java.net.UnknownHostException;
 import java.nio.ByteBuffer;
+import java.security.cert.X509Certificate;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.CountDownLatch;
@@ -51,12 +52,11 @@ import org.slf4j.LoggerFactory;
 
 /**
  * An implementation of the {@link Transport} interface using raw tcp/ip
- *
- * @author David Martin Clavo david(dot)martin(dot)clavo(at)gmail.com (logging improvement modifications)
- *
  */
 public class TcpTransport extends TransportThreadSupport implements Transport, Service, Runnable {
+
     private static final Logger LOG = LoggerFactory.getLogger(TcpTransport.class);
+
     protected final URI remoteLocation;
     protected final URI localLocation;
     protected final WireFormat wireFormat;
@@ -754,4 +754,13 @@ public class TcpTransport extends TransportThreadSupport implements Transport, S
     public WireFormat getWireFormat() {
         return wireFormat;
     }
+
+    @Override
+    public X509Certificate[] getPeerCertificates() {
+        return null;
+    }
+
+    @Override
+    public void setPeerCertificates(X509Certificate[] certificates) {
+    }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/31c55f75/activemq-client/src/main/java/org/apache/activemq/transport/udp/UdpTransport.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/transport/udp/UdpTransport.java b/activemq-client/src/main/java/org/apache/activemq/transport/udp/UdpTransport.java
index daa4860..d1ac088 100755
--- a/activemq-client/src/main/java/org/apache/activemq/transport/udp/UdpTransport.java
+++ b/activemq-client/src/main/java/org/apache/activemq/transport/udp/UdpTransport.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -20,7 +20,6 @@ import java.io.EOFException;
 import java.io.IOException;
 import java.net.BindException;
 import java.net.DatagramSocket;
-import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.SocketAddress;
 import java.net.SocketException;
@@ -28,6 +27,7 @@ import java.net.URI;
 import java.net.UnknownHostException;
 import java.nio.channels.AsynchronousCloseException;
 import java.nio.channels.DatagramChannel;
+import java.security.cert.X509Certificate;
 
 import org.apache.activemq.Service;
 import org.apache.activemq.command.Command;
@@ -47,10 +47,9 @@ import org.slf4j.LoggerFactory;
 
 /**
  * An implementation of the {@link Transport} interface using raw UDP
- * 
- * 
  */
 public class UdpTransport extends TransportThreadSupport implements Transport, Service, Runnable {
+
     private static final Logger LOG = LoggerFactory.getLogger(UdpTransport.class);
 
     private static final int MAX_BIND_ATTEMPTS = 50;
@@ -112,6 +111,7 @@ public class UdpTransport extends TransportThreadSupport implements Transport, S
     /**
      * A one way asynchronous send
      */
+    @Override
     public void oneway(Object command) throws IOException {
         oneway(command, targetAddress);
     }
@@ -130,6 +130,7 @@ public class UdpTransport extends TransportThreadSupport implements Transport, S
     /**
      * @return pretty print of 'this'
      */
+    @Override
     public String toString() {
         if (description != null) {
             return description + port;
@@ -141,6 +142,7 @@ public class UdpTransport extends TransportThreadSupport implements Transport, S
     /**
      * reads packets from a Socket
      */
+    @Override
     public void run() {
         LOG.trace("Consumer thread starting for: " + toString());
         while (!isStopped()) {
@@ -350,6 +352,7 @@ public class UdpTransport extends TransportThreadSupport implements Transport, S
         return host;
     }
 
+    @Override
     protected void doStart() throws Exception {
         getCommandChannel().start();
 
@@ -387,7 +390,7 @@ public class UdpTransport extends TransportThreadSupport implements Transport, S
         // down
         // a previously bound socket, it can take a little while before we can
         // bind it again.
-        // 
+        //
         for (int i = 0; i < MAX_BIND_ATTEMPTS; i++) {
             try {
                 socket.bind(localAddress);
@@ -419,6 +422,7 @@ public class UdpTransport extends TransportThreadSupport implements Transport, S
         return new InetSocketAddress(port);
     }
 
+    @Override
     protected void doStop(ServiceStopper stopper) throws Exception {
         if (channel != null) {
             channel.close();
@@ -457,6 +461,7 @@ public class UdpTransport extends TransportThreadSupport implements Transport, S
         }
     }
 
+    @Override
     public String getRemoteAddress() {
         if (targetAddress != null) {
             return "" + targetAddress;
@@ -464,10 +469,20 @@ public class UdpTransport extends TransportThreadSupport implements Transport, S
         return null;
     }
 
+    @Override
     public int getReceiveCounter() {
         if (commandChannel == null) {
             return 0;
         }
         return commandChannel.getReceiveCounter();
     }
+
+    @Override
+    public X509Certificate[] getPeerCertificates() {
+        return null;
+    }
+
+    @Override
+    public void setPeerCertificates(X509Certificate[] certificates) {
+    }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/31c55f75/activemq-client/src/main/java/org/apache/activemq/transport/ws/WSTransport.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/transport/ws/WSTransport.java b/activemq-client/src/main/java/org/apache/activemq/transport/ws/WSTransport.java
new file mode 100644
index 0000000..e15f86f
--- /dev/null
+++ b/activemq-client/src/main/java/org/apache/activemq/transport/ws/WSTransport.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.ws;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.activemq.transport.Transport;
+
+/**
+ * Interface for a WebSocket Transport which provide hooks that a servlet can
+ * use to pass along WebSocket data and events.
+ */
+public interface WSTransport extends Transport {
+
+    /**
+     * WS Transport output sink, used to give the WS Transport implementation
+     * a way to produce output back to the WS connection without coupling it
+     * to the implementation.
+     */
+    public interface WSTransportSink {
+
+        /**
+         * Called from the Transport when new outgoing String data is ready.
+         *
+         * @param data
+         *      The newly prepared outgoing string data.
+         *
+         * @throws IOException if an error occurs or the socket doesn't support text data.
+         */
+        void onSocketOutboundText(String data) throws IOException;
+
+        /**
+         * Called from the Transport when new outgoing String data is ready.
+         *
+         * @param data
+         *      The newly prepared outgoing string data.
+         *
+         * @throws IOException if an error occurs or the socket doesn't support text data.
+         */
+        void onSocketOutboundBinary(ByteBuffer data) throws IOException;
+    }
+
+    /**
+     * @return the WS sub-protocol that this transport is supplying.
+     */
+    String getSubProtocol();
+
+    /**
+     * Called to provide the WS with the output data sink.
+     */
+    void setTransportSink(WSTransportSink outputSink);
+
+    /**
+     * Called from the WebSocket framework when new incoming String data is received.
+     *
+     * @param data
+     *      The newly received incoming data.
+     *
+     * @throws IOException if an error occurs or the socket doesn't support text data.
+     */
+    void onWebSocketText(String data) throws IOException;
+
+    /**
+     * Called from the WebSocket framework when new incoming Binary data is received.
+     *
+     * @param data
+     *      The newly received incoming data.
+     *
+     * @throws IOException if an error occurs or the socket doesn't support binary data.
+     */
+    void onWebSocketBinary(ByteBuffer data) throws IOException;
+
+    /**
+     * Called from the WebSocket framework when the socket has been closed unexpectedly.
+     *
+     * @throws IOException if an error while processing the close.
+     */
+    void onWebSocketClosed() throws IOException;
+
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/31c55f75/activemq-http/pom.xml
----------------------------------------------------------------------
diff --git a/activemq-http/pom.xml b/activemq-http/pom.xml
index 5b288c5..e15e076 100755
--- a/activemq-http/pom.xml
+++ b/activemq-http/pom.xml
@@ -228,7 +228,7 @@
         </plugins>
       </build>
     </profile>
-	<profile>
+    <profile>
       <id>activemq.tests-autoTransport</id>
       <activation>
         <property>

http://git-wip-us.apache.org/repos/asf/activemq/blob/31c55f75/activemq-http/src/main/java/org/apache/activemq/transport/http/BlockingQueueTransport.java
----------------------------------------------------------------------
diff --git a/activemq-http/src/main/java/org/apache/activemq/transport/http/BlockingQueueTransport.java b/activemq-http/src/main/java/org/apache/activemq/transport/http/BlockingQueueTransport.java
index 4132c7c..fdf85b3 100755
--- a/activemq-http/src/main/java/org/apache/activemq/transport/http/BlockingQueueTransport.java
+++ b/activemq-http/src/main/java/org/apache/activemq/transport/http/BlockingQueueTransport.java
@@ -17,21 +17,22 @@
 package org.apache.activemq.transport.http;
 
 import java.io.IOException;
+import java.security.cert.X509Certificate;
 import java.util.Queue;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.activemq.transport.TransportSupport;
 import org.apache.activemq.util.ServiceStopper;
+import org.apache.activemq.wireformat.WireFormat;
 
 /**
  * A server side HTTP based TransportChannel which processes incoming packets
  * and adds outgoing packets onto a {@link Queue} so that they can be dispatched
  * by the HTTP GET requests from the client.
- *
- * 
  */
-public class  BlockingQueueTransport extends TransportSupport {
+public class BlockingQueueTransport extends TransportSupport {
+
     public static final long MAX_TIMEOUT = 30000L;
 
     private BlockingQueue<Object> queue;
@@ -44,6 +45,7 @@ public class  BlockingQueueTransport extends TransportSupport {
         return queue;
     }
 
+    @Override
     public void oneway(Object command) throws IOException {
         try {
             boolean success = queue.offer(command, MAX_TIMEOUT, TimeUnit.MILLISECONDS);
@@ -55,18 +57,35 @@ public class  BlockingQueueTransport extends TransportSupport {
         }
     }
 
-
+    @Override
     public String getRemoteAddress() {
         return "blockingQueue_" + queue.hashCode();
     }
 
+    @Override
     protected void doStart() throws Exception {
     }
 
+    @Override
     protected void doStop(ServiceStopper stopper) throws Exception {
     }
 
+    @Override
     public int getReceiveCounter() {
         return 0;
     }
+
+    @Override
+    public X509Certificate[] getPeerCertificates() {
+        return null;
+    }
+
+    @Override
+    public void setPeerCertificates(X509Certificate[] certificates) {
+    }
+
+    @Override
+    public WireFormat getWireFormat() {
+        return null;
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/activemq/blob/31c55f75/activemq-http/src/main/java/org/apache/activemq/transport/http/HttpClientTransport.java
----------------------------------------------------------------------
diff --git a/activemq-http/src/main/java/org/apache/activemq/transport/http/HttpClientTransport.java b/activemq-http/src/main/java/org/apache/activemq/transport/http/HttpClientTransport.java
index c65dbb9..7f446c5 100755
--- a/activemq-http/src/main/java/org/apache/activemq/transport/http/HttpClientTransport.java
+++ b/activemq-http/src/main/java/org/apache/activemq/transport/http/HttpClientTransport.java
@@ -20,6 +20,7 @@ import java.io.DataInputStream;
 import java.io.IOException;
 import java.io.InterruptedIOException;
 import java.net.URI;
+import java.security.cert.X509Certificate;
 import java.util.zip.GZIPInputStream;
 import java.util.zip.GZIPOutputStream;
 
@@ -30,6 +31,7 @@ import org.apache.activemq.util.ByteArrayOutputStream;
 import org.apache.activemq.util.IOExceptionSupport;
 import org.apache.activemq.util.IdGenerator;
 import org.apache.activemq.util.ServiceStopper;
+import org.apache.activemq.wireformat.WireFormat;
 import org.apache.http.Header;
 import org.apache.http.HttpHost;
 import org.apache.http.HttpRequest;
@@ -396,4 +398,17 @@ public class HttpClientTransport extends HttpTransportSupport {
         this.minSendAsCompressedSize = minSendAsCompressedSize;
     }
 
+    @Override
+    public X509Certificate[] getPeerCertificates() {
+        return null;
+    }
+
+    @Override
+    public void setPeerCertificates(X509Certificate[] certificates) {
+    }
+
+    @Override
+    public WireFormat getWireFormat() {
+        return getTextWireFormat();
+    }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/31c55f75/activemq-http/src/main/java/org/apache/activemq/transport/util/HttpTransportUtils.java
----------------------------------------------------------------------
diff --git a/activemq-http/src/main/java/org/apache/activemq/transport/util/HttpTransportUtils.java b/activemq-http/src/main/java/org/apache/activemq/transport/util/HttpTransportUtils.java
index 4daaf65..a8309b6 100644
--- a/activemq-http/src/main/java/org/apache/activemq/transport/util/HttpTransportUtils.java
+++ b/activemq-http/src/main/java/org/apache/activemq/transport/util/HttpTransportUtils.java
@@ -33,4 +33,25 @@ public class HttpTransportUtils {
         remoteAddress.append(request.getRemotePort());
         return remoteAddress.toString();
     }
+
+    public static String generateWsRemoteAddress(HttpServletRequest request, String schemePrefix) {
+        if (request == null) {
+            throw new IllegalArgumentException("HttpServletRequest must not be null.");
+        }
+
+        StringBuilder remoteAddress = new StringBuilder();
+        String scheme = request.getScheme();
+        if (scheme != null && scheme.equalsIgnoreCase("https")) {
+            scheme = schemePrefix + "+wss://";
+        } else {
+            scheme = schemePrefix + "+ws://";
+        }
+
+        remoteAddress.append(scheme);
+        remoteAddress.append(request.getRemoteAddr());
+        remoteAddress.append(":");
+        remoteAddress.append(request.getRemotePort());
+
+        return remoteAddress.toString();
+    }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/31c55f75/activemq-http/src/main/java/org/apache/activemq/transport/ws/AbstractStompSocket.java
----------------------------------------------------------------------
diff --git a/activemq-http/src/main/java/org/apache/activemq/transport/ws/AbstractStompSocket.java b/activemq-http/src/main/java/org/apache/activemq/transport/ws/AbstractStompSocket.java
index b8e0f8f..dd25a1d 100644
--- a/activemq-http/src/main/java/org/apache/activemq/transport/ws/AbstractStompSocket.java
+++ b/activemq-http/src/main/java/org/apache/activemq/transport/ws/AbstractStompSocket.java
@@ -147,7 +147,7 @@ public abstract class AbstractStompSocket extends TransportSupport implements St
                     stompInactivityMonitor.onCommand(new KeepAliveInfo());
                 } else {
                     StompFrame frame = (StompFrame)wireFormat.unmarshal(new ByteSequence(data.getBytes("UTF-8")));
-                    frame.setTransportContext(getCertificates());
+                    frame.setTransportContext(getPeerCertificates());
                     protocolConverter.onStompCommand(frame);
                 }
             }
@@ -162,11 +162,13 @@ public abstract class AbstractStompSocket extends TransportSupport implements St
         return socketTransportStarted.getCount() == 0;
     }
 
-    public X509Certificate[] getCertificates() {
+    @Override
+    public X509Certificate[] getPeerCertificates() {
         return certificates;
     }
 
-    public void setCertificates(X509Certificate[] certificates) {
+    @Override
+    public void setPeerCertificates(X509Certificate[] certificates) {
         this.certificates = certificates;
     }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/31c55f75/activemq-http/src/main/java/org/apache/activemq/transport/ws/StompWSConnection.java
----------------------------------------------------------------------
diff --git a/activemq-http/src/main/java/org/apache/activemq/transport/ws/StompWSConnection.java b/activemq-http/src/main/java/org/apache/activemq/transport/ws/StompWSConnection.java
index 3ded98f..340505a 100644
--- a/activemq-http/src/main/java/org/apache/activemq/transport/ws/StompWSConnection.java
+++ b/activemq-http/src/main/java/org/apache/activemq/transport/ws/StompWSConnection.java
@@ -129,10 +129,6 @@ public class StompWSConnection extends WebSocketAdapter implements WebSocketList
         }
     }
 
-
-    /* (non-Javadoc)
-     * @see org.eclipse.jetty.websocket.api.WebSocketListener#onWebSocketClose(int, java.lang.String)
-     */
     @Override
     public void onWebSocketClose(int statusCode, String reason) {
         LOG.trace("STOMP WS Connection closed, code:{} message:{}", statusCode, reason);
@@ -140,15 +136,10 @@ public class StompWSConnection extends WebSocketAdapter implements WebSocketList
         this.connection = null;
         this.closeCode = statusCode;
         this.closeMessage = reason;
-
     }
 
-    /* (non-Javadoc)
-     * @see org.eclipse.jetty.websocket.api.WebSocketListener#onWebSocketConnect(org.eclipse.jetty.websocket.api.Session)
-     */
     @Override
-    public void onWebSocketConnect(
-            org.eclipse.jetty.websocket.api.Session session) {
+    public void onWebSocketConnect(org.eclipse.jetty.websocket.api.Session session) {
         this.connection = session;
         this.connectLatch.countDown();
     }

http://git-wip-us.apache.org/repos/asf/activemq/blob/31c55f75/activemq-http/src/main/java/org/apache/activemq/transport/ws/WSTransportFactory.java
----------------------------------------------------------------------
diff --git a/activemq-http/src/main/java/org/apache/activemq/transport/ws/WSTransportFactory.java b/activemq-http/src/main/java/org/apache/activemq/transport/ws/WSTransportFactory.java
index bfcb5df..744685b 100644
--- a/activemq-http/src/main/java/org/apache/activemq/transport/ws/WSTransportFactory.java
+++ b/activemq-http/src/main/java/org/apache/activemq/transport/ws/WSTransportFactory.java
@@ -23,6 +23,8 @@ import java.net.URISyntaxException;
 import java.util.HashMap;
 import java.util.Map;
 
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.BrokerServiceAware;
 import org.apache.activemq.transport.TransportFactory;
 import org.apache.activemq.transport.TransportServer;
 import org.apache.activemq.util.IOExceptionSupport;
@@ -32,7 +34,9 @@ import org.apache.activemq.util.URISupport;
 /**
  * Factory for WebSocket (ws) transport
  */
-public class WSTransportFactory extends TransportFactory {
+public class WSTransportFactory extends TransportFactory implements BrokerServiceAware {
+
+    private BrokerService brokerService;
 
     @Override
     public TransportServer doBind(URI location) throws IOException {
@@ -42,6 +46,7 @@ public class WSTransportFactory extends TransportFactory {
             Map<String, Object> httpOptions = IntrospectionSupport.extractProperties(options, "http.");
             Map<String, Object> transportOptions = IntrospectionSupport.extractProperties(options, "");
             IntrospectionSupport.setProperties(result, transportOptions);
+            result.setBrokerService(brokerService);
             result.setTransportOption(transportOptions);
             result.setHttpOptions(httpOptions);
             return result;
@@ -49,4 +54,9 @@ public class WSTransportFactory extends TransportFactory {
             throw IOExceptionSupport.create(e);
         }
     }
+
+    @Override
+    public void setBrokerService(BrokerService brokerService) {
+        this.brokerService = brokerService;
+    }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/31c55f75/activemq-http/src/main/java/org/apache/activemq/transport/ws/WSTransportProxy.java
----------------------------------------------------------------------
diff --git a/activemq-http/src/main/java/org/apache/activemq/transport/ws/WSTransportProxy.java b/activemq-http/src/main/java/org/apache/activemq/transport/ws/WSTransportProxy.java
new file mode 100644
index 0000000..7d3ba18
--- /dev/null
+++ b/activemq-http/src/main/java/org/apache/activemq/transport/ws/WSTransportProxy.java
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.ws;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.security.cert.X509Certificate;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.BrokerServiceAware;
+import org.apache.activemq.transport.Transport;
+import org.apache.activemq.transport.TransportSupport;
+import org.apache.activemq.transport.ws.WSTransport.WSTransportSink;
+import org.apache.activemq.util.IOExceptionSupport;
+import org.apache.activemq.util.IntrospectionSupport;
+import org.apache.activemq.util.ServiceStopper;
+import org.apache.activemq.wireformat.WireFormat;
+import org.eclipse.jetty.websocket.api.Session;
+import org.eclipse.jetty.websocket.api.WebSocketListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A proxy class that manages sending WebSocket events to the wrapped protocol level
+ * WebSocket Transport.
+ */
+public final class WSTransportProxy extends TransportSupport implements Transport, WebSocketListener, BrokerServiceAware, WSTransportSink {
+
+    private static final Logger LOG = LoggerFactory.getLogger(WSTransportProxy.class);
+
+    private final int ORDERLY_CLOSE_TIMEOUT = 10;
+
+    private final ReentrantLock protocolLock = new ReentrantLock();
+    private final CountDownLatch socketTransportStarted = new CountDownLatch(1);
+    private final String remoteAddress;
+
+    private final Transport transport;
+    private final WSTransport wsTransport;
+    private Session session;
+
+    /**
+     * Create a WebSocket Transport Proxy instance that will pass
+     * along WebSocket event to the underlying protocol level transport.
+     *
+     * @param remoteAddress
+     *      the provided remote address to report being connected to.
+     * @param transport
+     *      The protocol level WebSocket Transport
+     */
+    public WSTransportProxy(String remoteAddress, Transport transport) {
+        this.remoteAddress = remoteAddress;
+        this.transport = transport;
+        this.wsTransport = transport.narrow(WSTransport.class);
+
+        if (wsTransport == null) {
+            throw new IllegalArgumentException("Provided Transport does not contains a WSTransport implementation");
+        } else {
+            wsTransport.setTransportSink(this);
+        }
+    }
+
+    /**
+     * @return the sub-protocol of the proxied transport.
+     */
+    public String getSubProtocol() {
+        return wsTransport.getSubProtocol();
+    }
+
+    /**
+     * Apply any configure Transport options on the wrapped Transport and its contained
+     * wireFormat instance.
+     */
+    public void setTransportOptions(Map<String, Object> options) {
+        Map<String, Object> wireFormatOptions = IntrospectionSupport.extractProperties(options, "wireFormat.");
+
+        IntrospectionSupport.setProperties(transport, options);
+        IntrospectionSupport.setProperties(transport.getWireFormat(), wireFormatOptions);
+    }
+
+    @Override
+    public void setBrokerService(BrokerService brokerService) {
+        if (transport instanceof BrokerServiceAware) {
+            ((BrokerServiceAware) transport).setBrokerService(brokerService);
+        }
+    }
+
+    @Override
+    public void oneway(Object command) throws IOException {
+        protocolLock.lock();
+        try {
+            transport.oneway(command);
+        } catch (Exception e) {
+            onException(IOExceptionSupport.create(e));
+        } finally {
+            protocolLock.unlock();
+        }
+    }
+
+    @Override
+    public X509Certificate[] getPeerCertificates() {
+        return transport.getPeerCertificates();
+    }
+
+    @Override
+    public void setPeerCertificates(X509Certificate[] certificates) {
+        transport.setPeerCertificates(certificates);
+    }
+
+    @Override
+    public String getRemoteAddress() {
+        return remoteAddress;
+    }
+
+    @Override
+    public WireFormat getWireFormat() {
+        return transport.getWireFormat();
+    }
+
+    @Override
+    public int getReceiveCounter() {
+        return transport.getReceiveCounter();
+    }
+
+    @Override
+    protected void doStop(ServiceStopper stopper) throws Exception {
+        transport.stop();
+        if (session != null && session.isOpen()) {
+            session.close();
+        }
+    }
+
+    @Override
+    protected void doStart() throws Exception {
+        socketTransportStarted.countDown();
+
+        transport.setTransportListener(getTransportListener());
+        transport.start();
+    }
+
+    //----- WebSocket methods being proxied to the WS Transport --------------//
+
+    @Override
+    public void onWebSocketBinary(byte[] payload, int offset, int length) {
+        if (!transportStartedAtLeastOnce()) {
+            LOG.debug("Waiting for WebSocket to be properly started...");
+            try {
+                socketTransportStarted.await();
+            } catch (InterruptedException e) {
+                LOG.warn("While waiting for WebSocket to be properly started, we got interrupted!! Should be okay, but you could see race conditions...");
+            }
+        }
+
+        protocolLock.lock();
+        try {
+            wsTransport.onWebSocketBinary(ByteBuffer.wrap(payload, offset, length));
+        } catch (Exception e) {
+            onException(IOExceptionSupport.create(e));
+        } finally {
+            protocolLock.unlock();
+        }
+    }
+
+    @Override
+    public void onWebSocketText(String data) {
+        if (!transportStartedAtLeastOnce()) {
+            LOG.debug("Waiting for WebSocket to be properly started...");
+            try {
+                socketTransportStarted.await();
+            } catch (InterruptedException e) {
+                LOG.warn("While waiting for WebSocket to be properly started, we got interrupted!! Should be okay, but you could see race conditions...");
+            }
+        }
+
+        protocolLock.lock();
+        try {
+            wsTransport.onWebSocketText(data);
+        } catch (Exception e) {
+            onException(IOExceptionSupport.create(e));
+        } finally {
+            protocolLock.unlock();
+        }
+    }
+
+    @Override
+    public void onWebSocketClose(int statusCode, String reason) {
+        try {
+            if (protocolLock.tryLock() || protocolLock.tryLock(ORDERLY_CLOSE_TIMEOUT, TimeUnit.SECONDS)) {
+                LOG.debug("WebSocket closed: code[{}] message[{}]", statusCode, reason);
+                wsTransport.onWebSocketClosed();
+            }
+        } catch (Exception e) {
+            LOG.debug("Failed to close WebSocket cleanly", e);
+        } finally {
+            if (protocolLock.isHeldByCurrentThread()) {
+                protocolLock.unlock();
+            }
+        }
+    }
+
+    @Override
+    public void onWebSocketConnect(Session session) {
+        this.session = session;
+    }
+
+    @Override
+    public void onWebSocketError(Throwable cause) {
+        onException(IOExceptionSupport.create(cause));
+    }
+
+    @Override
+    public void onSocketOutboundText(String data) throws IOException {
+        if (!transportStartedAtLeastOnce()) {
+            LOG.debug("Waiting for WebSocket to be properly started...");
+            try {
+                socketTransportStarted.await();
+            } catch (InterruptedException e) {
+                LOG.warn("While waiting for WebSocket to be properly started, we got interrupted!! Should be okay, but you could see race conditions...");
+            }
+        }
+
+        LOG.trace("WS Proxy sending string of size {} out", data.length());
+        session.getRemote().sendString(data);
+    }
+
+    @Override
+    public void onSocketOutboundBinary(ByteBuffer data) throws IOException {
+        if (!transportStartedAtLeastOnce()) {
+            LOG.debug("Waiting for WebSocket to be properly started...");
+            try {
+                socketTransportStarted.await();
+            } catch (InterruptedException e) {
+                LOG.warn("While waiting for WebSocket to be properly started, we got interrupted!! Should be okay, but you could see race conditions...");
+            }
+        }
+
+        LOG.trace("WS Proxy sending {} bytes out", data.remaining());
+        int limit = data.limit();
+        session.getRemote().sendBytes(data);
+
+        // Reset back to original limit and move position to match limit indicating
+        // that we read everything, the websocket sender clears the passed buffer
+        // which can make it look as if nothing was written.
+        data.limit(limit);
+        data.position(limit);
+    }
+
+    //----- Internal implementation ------------------------------------------//
+
+    private boolean transportStartedAtLeastOnce() {
+        return socketTransportStarted.getCount() == 0;
+    }
+}


Mime
View raw message