qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject [16/27] Initial drop of donated AMQP Client Code.
Date Tue, 23 Sep 2014 18:20:40 GMT
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpTypedObjectDelegate.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpTypedObjectDelegate.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpTypedObjectDelegate.java
new file mode 100644
index 0000000..234caaa
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpTypedObjectDelegate.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.provider.amqp.message;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.qpid.proton.amqp.messaging.AmqpSequence;
+import org.apache.qpid.proton.amqp.messaging.AmqpValue;
+import org.apache.qpid.proton.amqp.messaging.Data;
+import org.apache.qpid.proton.amqp.messaging.Section;
+import org.apache.qpid.proton.message.Message;
+
+/**
+ * Wrapper around an AMQP Message instance that will be treated as a JMS ObjectMessage
+ * type.
+ */
+public class AmqpTypedObjectDelegate implements AmqpObjectTypeDelegate {
+
+    private final Message message;
+
+    /**
+     * Create a new delegate that uses Java serialization to store the message content.
+     *
+     * @param message
+     *        the AMQP message instance where the object is to be stored / read.
+     */
+    public AmqpTypedObjectDelegate(Message message) {
+        this.message = message;
+    }
+
+    @Override
+    public Serializable getObject() throws IOException, ClassNotFoundException {
+        // TODO: this should actually return a snapshot of the object, so we
+        // need to save the bytes so we can return an equal/unmodified object later
+
+        Section body = message.getBody();
+        if (body == null) {
+            return null;
+        } else if (body instanceof AmqpValue) {
+            // TODO: This is assuming the object can be immediately returned, and is
+            //       Serializable. We will actually have to ensure elements are
+            //       Serializable and e.g convert the Uint/Ubyte etc wrappers.
+            return (Serializable) ((AmqpValue) body).getValue();
+        } else if (body instanceof Data) {
+            // TODO: return as byte[]? ByteBuffer?
+            throw new UnsupportedOperationException("Data support still to be added");
+        } else if (body instanceof AmqpSequence) {
+            // TODO: return as list?
+            throw new UnsupportedOperationException("AmqpSequence support still to be added");
+        } else {
+            throw new IllegalStateException("Unexpected body type: " + body.getClass().getSimpleName());
+        }
+    }
+
+    @Override
+    public void setObject(Serializable value) throws IOException {
+        if (value == null) {
+            // TODO: verify whether not sending a body is OK, send some form of
+            // null (AmqpValue containing null) instead if it isn't?
+            message.setBody(null);
+        } else if (isSupportedAmqpValueObjectType(value)) {
+            // TODO: This is a temporary hack, we actually need to take a snapshot of the object
+            // at this point in time, not simply set the object itself into the Proton message.
+            // We will need to encode it now, first to save the snapshot to send, and also to
+            // verify up front that we can actually send it later.
+
+            // Even if we do that we would currently then need to decode it later to set the
+            // body to send, unless we augment Proton to allow setting the bytes directly.
+            // We will always need to decode bytes to return a snapshot from getObject(). We
+            // will need to save the bytes somehow to support that on received messages.
+            message.setBody(new AmqpValue(value));
+        } else {
+            // TODO: Data and AmqpSequence?
+            throw new IllegalArgumentException("Encoding this object type with the AMQP type system is not supported: " + value.getClass().getName());
+        }
+
+        // TODO: ensure content type is not set (assuming we aren't using data sections)?
+    }
+
+    private boolean isSupportedAmqpValueObjectType(Serializable serializable) {
+        // TODO: augment supported types to encode as an AmqpValue?
+        return serializable instanceof Map<?,?> ||
+               serializable instanceof List<?> ||
+               serializable.getClass().isArray();
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProvider.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProvider.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProvider.java
new file mode 100644
index 0000000..2f4db2c
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProvider.java
@@ -0,0 +1,857 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.provider.failover;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+
+import javax.jms.JMSException;
+
+import org.apache.qpid.jms.JmsSslContext;
+import org.apache.qpid.jms.message.JmsDefaultMessageFactory;
+import org.apache.qpid.jms.message.JmsInboundMessageDispatch;
+import org.apache.qpid.jms.message.JmsMessageFactory;
+import org.apache.qpid.jms.message.JmsOutboundMessageDispatch;
+import org.apache.qpid.jms.meta.JmsConnectionInfo;
+import org.apache.qpid.jms.meta.JmsConsumerId;
+import org.apache.qpid.jms.meta.JmsResource;
+import org.apache.qpid.jms.meta.JmsSessionId;
+import org.apache.qpid.jms.provider.AsyncResult;
+import org.apache.qpid.jms.provider.DefaultProviderListener;
+import org.apache.qpid.jms.provider.Provider;
+import org.apache.qpid.jms.provider.ProviderFactory;
+import org.apache.qpid.jms.provider.ProviderFuture;
+import org.apache.qpid.jms.provider.ProviderListener;
+import org.apache.qpid.jms.provider.ProviderConstants.ACK_TYPE;
+import org.apache.qpid.jms.util.IOExceptionSupport;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A Provider Facade that provides services for detection dropped Provider connections
+ * and attempting to reconnect to a different remote peer.  Upon establishment of a new
+ * connection the FailoverProvider will initiate state recovery of the active JMS
+ * framework resources.
+ */
+public class FailoverProvider extends DefaultProviderListener implements Provider {
+
+    private static final Logger LOG = LoggerFactory.getLogger(FailoverProvider.class);
+
+    private static final int UNLIMITED = -1;
+
+    private ProviderListener listener;
+    private Provider provider;
+    private final FailoverUriPool uris;
+
+    private final ExecutorService serializer;
+    private final ScheduledExecutorService connectionHub;
+    private final AtomicBoolean closed = new AtomicBoolean();
+    private final AtomicBoolean failed = new AtomicBoolean();
+    private final AtomicLong requestId = new AtomicLong();
+    private final Map<Long, FailoverRequest> requests = new LinkedHashMap<Long, FailoverRequest>();
+    private final DefaultProviderListener closedListener = new DefaultProviderListener();
+    private final JmsSslContext sslContext;
+    private final JmsMessageFactory defaultMessageFactory = new JmsDefaultMessageFactory();
+
+    // Current state of connection / reconnection
+    private boolean firstConnection = true;
+    private long reconnectAttempts;
+    private long reconnectDelay = TimeUnit.SECONDS.toMillis(5);
+    private IOException failureCause;
+    private URI connectedURI;
+
+    // Timeout values configured via JmsConnectionInfo
+    private long connectTimeout = JmsConnectionInfo.DEFAULT_CONNECT_TIMEOUT;
+    private long closeTimeout = JmsConnectionInfo.DEFAULT_CLOSE_TIMEOUT;
+    private long sendTimeout =  JmsConnectionInfo.DEFAULT_SEND_TIMEOUT;
+    private long requestTimeout = JmsConnectionInfo.DEFAULT_REQUEST_TIMEOUT;
+
+    // Configuration values.
+    private long initialReconnectDelay = 0L;
+    private long maxReconnectDelay = TimeUnit.SECONDS.toMillis(30);
+    private boolean useExponentialBackOff = true;
+    private double backOffMultiplier = 2d;
+    private int maxReconnectAttempts = UNLIMITED;
+    private int startupMaxReconnectAttempts = UNLIMITED;
+    private int warnAfterReconnectAttempts = 10;
+
+    public FailoverProvider(Map<String, String> nestedOptions) {
+        this(null, nestedOptions);
+    }
+
+    public FailoverProvider(URI[] uris) {
+        this(uris, null);
+    }
+
+    public FailoverProvider(URI[] uris, Map<String, String> nestedOptions) {
+        this.uris = new FailoverUriPool(uris, nestedOptions);
+        this.sslContext = JmsSslContext.getCurrentSslContext();
+
+        this.serializer = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
+
+            @Override
+            public Thread newThread(Runnable runner) {
+                Thread serial = new Thread(runner);
+                serial.setDaemon(true);
+                serial.setName("FailoverProvider: serialization thread");
+                return serial;
+            }
+        });
+
+        // All Connection attempts happen in this schedulers thread.  Once a connection
+        // is established it will hand the open connection back to the serializer thread
+        // for state recovery.
+        this.connectionHub = Executors.newScheduledThreadPool(1, new ThreadFactory() {
+
+            @Override
+            public Thread newThread(Runnable runner) {
+                Thread serial = new Thread(runner);
+                serial.setDaemon(true);
+                serial.setName("FailoverProvider: connect thread");
+                return serial;
+            }
+        });
+    }
+
+    @Override
+    public void connect() throws IOException {
+        checkClosed();
+        LOG.debug("Performing initial connection attempt");
+        triggerReconnectionAttempt();
+    }
+
+    @Override
+    public void start() throws IOException, IllegalStateException {
+        checkClosed();
+
+        if (listener == null) {
+            throw new IllegalStateException("No ProviderListener registered.");
+        }
+    }
+
+    @Override
+    public void close() {
+        if (closed.compareAndSet(false, true)) {
+            final ProviderFuture request = new ProviderFuture();
+            serializer.execute(new Runnable() {
+
+                @Override
+                public void run() {
+                    try {
+                        IOException error = failureCause != null ? failureCause : new IOException("Connection closed");
+                        List<FailoverRequest> pending = new ArrayList<FailoverRequest>(requests.values());
+                        for (FailoverRequest request : pending) {
+                            request.onFailure(error);
+                        }
+
+                        if (provider != null) {
+                            provider.close();
+                        }
+                    } catch (Exception e) {
+                        LOG.debug("Caught exception while closing connection");
+                    } finally {
+
+                        if (connectionHub != null) {
+                            connectionHub.shutdown();
+                        }
+
+                        if (serializer != null) {
+                            serializer.shutdown();
+                        }
+
+                        request.onSuccess();
+                    }
+                }
+            });
+
+            try {
+                if (this.closeTimeout >= 0) {
+                    request.sync();
+                } else {
+                    request.sync(closeTimeout, TimeUnit.MILLISECONDS);
+                }
+            } catch (IOException e) {
+                LOG.warn("Error caught while closing Provider: ", e.getMessage());
+            }
+        }
+    }
+
+    @Override
+    public void create(final JmsResource resource, AsyncResult request) throws IOException, JMSException, UnsupportedOperationException {
+        checkClosed();
+        final FailoverRequest pending = new FailoverRequest(request) {
+            @Override
+            public void doTask() throws Exception {
+                if (resource instanceof JmsConnectionInfo) {
+                    JmsConnectionInfo connectionInfo = (JmsConnectionInfo) resource;
+                    connectTimeout = connectionInfo.getConnectTimeout();
+                    closeTimeout = connectionInfo.getCloseTimeout();
+                    sendTimeout = connectionInfo.getSendTimeout();
+                    requestTimeout = connectionInfo.getRequestTimeout();
+                }
+
+                provider.create(resource, this);
+            }
+        };
+
+        serializer.execute(pending);
+    }
+
+    @Override
+    public void start(final JmsResource resource, final AsyncResult request) throws IOException, JMSException {
+        checkClosed();
+        final FailoverRequest pending = new FailoverRequest(request) {
+            @Override
+            public void doTask() throws Exception {
+                provider.start(resource, this);
+            }
+        };
+
+        serializer.execute(pending);
+    }
+
+    @Override
+    public void destroy(final JmsResource resourceId, AsyncResult request) throws IOException, JMSException, UnsupportedOperationException {
+        checkClosed();
+        final FailoverRequest pending = new FailoverRequest(request) {
+            @Override
+            public void doTask() throws IOException, JMSException, UnsupportedOperationException {
+                provider.destroy(resourceId, this);
+            }
+
+            @Override
+            public boolean succeedsWhenOffline() {
+                // Allow this to succeed, acks would be stale.
+                return true;
+            }
+        };
+
+        serializer.execute(pending);
+    }
+
+    @Override
+    public void send(final JmsOutboundMessageDispatch envelope, AsyncResult request) throws IOException, JMSException {
+        checkClosed();
+        final FailoverRequest pending = new FailoverRequest(request) {
+            @Override
+            public void doTask() throws Exception {
+                provider.send(envelope, this);
+            }
+        };
+
+        serializer.execute(pending);
+    }
+
+    @Override
+    public void acknowledge(final JmsSessionId sessionId, AsyncResult request) throws IOException, JMSException {
+        checkClosed();
+        final FailoverRequest pending = new FailoverRequest(request) {
+            @Override
+            public void doTask() throws Exception {
+                provider.acknowledge(sessionId, this);
+            }
+
+            @Override
+            public boolean succeedsWhenOffline() {
+                // Allow this to succeed, acks would be stale.
+                return true;
+            }
+        };
+
+        serializer.execute(pending);
+    }
+
+    @Override
+    public void acknowledge(final JmsInboundMessageDispatch envelope, final ACK_TYPE ackType, AsyncResult request) throws IOException, JMSException {
+        checkClosed();
+        final FailoverRequest pending = new FailoverRequest(request) {
+            @Override
+            public void doTask() throws Exception {
+                provider.acknowledge(envelope, ackType, this);
+            }
+
+            @Override
+            public boolean succeedsWhenOffline() {
+                // Allow this to succeed, acks would be stale.
+                return true;
+            }
+        };
+
+        serializer.execute(pending);
+    }
+
+    @Override
+    public void commit(final JmsSessionId sessionId, AsyncResult request) throws IOException, JMSException, UnsupportedOperationException {
+        checkClosed();
+        final FailoverRequest pending = new FailoverRequest(request) {
+            @Override
+            public void doTask() throws Exception {
+                provider.commit(sessionId, this);
+            }
+
+            @Override
+            public boolean failureWhenOffline() {
+                return true;
+            }
+        };
+
+        serializer.execute(pending);
+    }
+
+    @Override
+    public void rollback(final JmsSessionId sessionId, AsyncResult request) throws IOException, JMSException, UnsupportedOperationException {
+        checkClosed();
+        final FailoverRequest pending = new FailoverRequest(request) {
+            @Override
+            public void doTask() throws Exception {
+                provider.rollback(sessionId, this);
+            }
+
+            @Override
+            public boolean failureWhenOffline() {
+                return true;
+            }
+        };
+
+        serializer.execute(pending);
+    }
+
+
+    @Override
+    public void recover(final JmsSessionId sessionId, final AsyncResult request) throws IOException, UnsupportedOperationException {
+        checkClosed();
+        final FailoverRequest pending = new FailoverRequest(request) {
+            @Override
+            public void doTask() throws Exception {
+                provider.recover(sessionId, this);
+            }
+
+            @Override
+            public boolean succeedsWhenOffline() {
+                return true;
+            }
+        };
+
+        serializer.execute(pending);
+    }
+
+    @Override
+    public void unsubscribe(final String subscription, AsyncResult request) throws IOException, JMSException, UnsupportedOperationException {
+        checkClosed();
+        final FailoverRequest pending = new FailoverRequest(request) {
+            @Override
+            public void doTask() throws Exception {
+                provider.unsubscribe(subscription, this);
+            }
+        };
+
+        serializer.execute(pending);
+    }
+
+    @Override
+    public void pull(final JmsConsumerId consumerId, final long timeout, final AsyncResult request) throws IOException, UnsupportedOperationException {
+        checkClosed();
+        final FailoverRequest pending = new FailoverRequest(request) {
+            @Override
+            public void doTask() throws Exception {
+                provider.pull(consumerId, timeout, this);
+            }
+        };
+
+        serializer.execute(pending);
+    }
+
+    @Override
+    public JmsMessageFactory getMessageFactory() {
+        final AtomicReference<JmsMessageFactory> result =
+            new AtomicReference<JmsMessageFactory>(defaultMessageFactory);
+
+        serializer.execute(new Runnable() {
+
+            @Override
+            public void run() {
+                if (provider != null) {
+                    result.set(provider.getMessageFactory());
+                }
+            }
+        });
+
+        return result.get();
+    }
+
+    //--------------- Connection Error and Recovery methods ------------------//
+
+    /**
+     * This method is always called from within the FailoverProvider's serialization thread.
+     *
+     * When a failure is encountered either from an outgoing request or from an error fired
+     * from the underlying Provider instance this method is called to determine if a reconnect
+     * is allowed and if so a new reconnect cycle is triggered on the connection thread.
+     *
+     * @param cause
+     */
+    private void handleProviderFailure(final IOException cause) {
+        LOG.debug("handling Provider failure: {}", cause.getMessage());
+        LOG.trace("stack", cause);
+
+        this.provider.setProviderListener(closedListener);
+        URI failedURI = this.provider.getRemoteURI();
+        try {
+            this.provider.close();
+        } catch (Throwable error) {
+            LOG.trace("Caught exception while closing failed provider: {}", error.getMessage());
+        }
+        this.provider = null;
+
+        if (reconnectAllowed()) {
+            ProviderListener listener = this.listener;
+            if (listener != null) {
+                listener.onConnectionInterrupted(failedURI);
+            }
+            triggerReconnectionAttempt();
+        } else {
+            ProviderListener listener = this.listener;
+            if (listener != null) {
+                listener.onConnectionFailure(cause);
+            }
+        }
+    }
+
+    /**
+     * Called from the reconnection thread.  This method enqueues a new task that
+     * will attempt to recover connection state, once successful normal operations
+     * will resume.  If an error occurs while attempting to recover the JMS framework
+     * state then a reconnect cycle is again triggered on the connection thread.
+     *
+     * @param provider
+     *        The newly connect Provider instance that will become active.
+     */
+    private void initializeNewConnection(final Provider provider) {
+        this.serializer.execute(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    if (firstConnection) {
+                        firstConnection = false;
+                        FailoverProvider.this.provider = provider;
+                        provider.setProviderListener(FailoverProvider.this);
+
+                        List<FailoverRequest> pending = new ArrayList<FailoverRequest>(requests.values());
+                        for (FailoverRequest request : pending) {
+                            request.run();
+                        }
+                    } else {
+                        LOG.debug("Signalling connection recovery: {}", provider);
+                        FailoverProvider.this.provider = provider;
+                        provider.setProviderListener(FailoverProvider.this);
+
+                        // Stage 1: Recovery all JMS Framework resources
+                        listener.onConnectionRecovery(provider);
+
+                        // Stage 2: Restart consumers, send pull commands, etc.
+                        listener.onConnectionRecovered(provider);
+
+                        // Stage 3: Let the client know that connection has restored.
+                        listener.onConnectionRestored(provider.getRemoteURI());
+
+                        // Stage 4: Send pending actions.
+                        List<FailoverRequest> pending = new ArrayList<FailoverRequest>(requests.values());
+                        for (FailoverRequest request : pending) {
+                            request.run();
+                        }
+
+                        reconnectDelay = initialReconnectDelay;
+                        reconnectAttempts = 0;
+                        connectedURI = provider.getRemoteURI();
+                        uris.connected();
+                    }
+                } catch (Throwable error) {
+                    handleProviderFailure(IOExceptionSupport.create(error));
+                }
+            }
+        });
+    }
+
+    /**
+     * Called when the Provider was either first created or when a connection failure has
+     * been connected.  A reconnection attempt is immediately executed on the connection
+     * thread.  If a new Provider is able to be created and connected then a recovery task
+     * is scheduled on the main serializer thread.  If the connect attempt fails another
+     * attempt is scheduled based on the configured delay settings until a max attempts
+     * limit is hit, if one is set.
+     */
+    private void triggerReconnectionAttempt() {
+        if (closed.get() || failed.get()) {
+            return;
+        }
+
+        connectionHub.execute(new Runnable() {
+            @Override
+            public void run() {
+                if (provider != null || closed.get() || failed.get()) {
+                    return;
+                }
+
+                reconnectAttempts++;
+                Throwable failure = null;
+                URI target = uris.getNext();
+                if (target != null) {
+                    try {
+                        LOG.debug("Attempting connection to: {}", target);
+                        JmsSslContext.setCurrentSslContext(sslContext);
+                        Provider provider = ProviderFactory.createAsync(target);
+                        initializeNewConnection(provider);
+                        return;
+                    } catch (Throwable e) {
+                        LOG.info("Connection attempt to: {} failed.", target);
+                        failure = e;
+                    }
+                }
+
+                int reconnectLimit = reconnectAttemptLimit();
+
+                if (reconnectLimit != UNLIMITED && reconnectAttempts >= reconnectLimit) {
+                    LOG.error("Failed to connect after: " + reconnectAttempts + " attempt(s)");
+                    failed.set(true);
+                    failureCause = IOExceptionSupport.create(failure);
+                    if (listener != null) {
+                        listener.onConnectionFailure(failureCause);
+                    };
+
+                    return;
+                }
+
+                int warnInterval = getWarnAfterReconnectAttempts();
+                if (warnInterval > 0 && (reconnectAttempts % warnInterval) == 0) {
+                    LOG.warn("Failed to connect after: {} attempt(s) continuing to retry.", reconnectAttempts);
+                }
+
+                long delay = nextReconnectDelay();
+                connectionHub.schedule(this, delay, TimeUnit.MILLISECONDS);
+            }
+        });
+    }
+
+    private boolean reconnectAllowed() {
+        return reconnectAttemptLimit() != 0;
+    }
+
+    private int reconnectAttemptLimit() {
+        int maxReconnectValue = this.maxReconnectAttempts;
+        if (firstConnection && this.startupMaxReconnectAttempts != UNLIMITED) {
+            maxReconnectValue = this.startupMaxReconnectAttempts;
+        }
+        return maxReconnectValue;
+    }
+
+    private long nextReconnectDelay() {
+        if (useExponentialBackOff) {
+            // Exponential increment of reconnect delay.
+            reconnectDelay *= backOffMultiplier;
+            if (reconnectDelay > maxReconnectDelay) {
+                reconnectDelay = maxReconnectDelay;
+            }
+        }
+
+        return reconnectDelay;
+    }
+
+    protected void checkClosed() throws IOException {
+        if (closed.get()) {
+            throw new IOException("The Provider is already closed");
+        }
+    }
+
+    //--------------- DefaultProviderListener overrides ----------------------//
+
+    @Override
+    public void onMessage(final JmsInboundMessageDispatch envelope) {
+        if (closed.get() || failed.get()) {
+            return;
+        }
+        serializer.execute(new Runnable() {
+            @Override
+            public void run() {
+                if (!closed.get()) {
+                    listener.onMessage(envelope);
+                }
+            }
+        });
+    }
+
+    @Override
+    public void onConnectionFailure(final IOException ex) {
+        if (closed.get() || failed.get()) {
+            return;
+        }
+        serializer.execute(new Runnable() {
+            @Override
+            public void run() {
+                if (!closed.get() && !failed.get()) {
+                    LOG.debug("Failover: the provider reports failure: {}", ex.getMessage());
+                    handleProviderFailure(ex);
+                }
+            }
+        });
+    }
+
+    //--------------- URI update and rebalance methods -----------------------//
+
+    public void add(final URI uri) {
+        serializer.execute(new Runnable() {
+            @Override
+            public void run() {
+                uris.add(uri);
+            }
+        });
+    }
+
+    public void remove(final URI uri) {
+        serializer.execute(new Runnable() {
+            @Override
+            public void run() {
+                uris.remove(uri);
+            }
+        });
+    }
+
+    //--------------- Property Getters and Setters ---------------------------//
+
+    @Override
+    public URI getRemoteURI() {
+        Provider provider = this.provider;
+        if (provider != null) {
+            return provider.getRemoteURI();
+        }
+        return null;
+    }
+
+    @Override
+    public void setProviderListener(ProviderListener listener) {
+        this.listener = listener;
+    }
+
+    @Override
+    public ProviderListener getProviderListener() {
+        return listener;
+    }
+
+    public boolean isRandomize() {
+        return uris.isRandomize();
+    }
+
+    public void setRandomize(boolean value) {
+        this.uris.setRandomize(value);
+    }
+
+    public long getInitialReconnectDealy() {
+        return initialReconnectDelay;
+    }
+
+    public void setInitialReconnectDealy(long initialReconnectDealy) {
+        this.initialReconnectDelay = initialReconnectDealy;
+    }
+
+    public long getMaxReconnectDelay() {
+        return maxReconnectDelay;
+    }
+
+    public void setMaxReconnectDelay(long maxReconnectDelay) {
+        this.maxReconnectDelay = maxReconnectDelay;
+    }
+
+    public int getMaxReconnectAttempts() {
+        return maxReconnectAttempts;
+    }
+
+    public void setMaxReconnectAttempts(int maxReconnectAttempts) {
+        this.maxReconnectAttempts = maxReconnectAttempts;
+    }
+
+    public int getStartupMaxReconnectAttempts() {
+        return startupMaxReconnectAttempts;
+    }
+
+    public void setStartupMaxReconnectAttempts(int startupMaxReconnectAttempts) {
+        this.startupMaxReconnectAttempts = startupMaxReconnectAttempts;
+    }
+
+    /**
+     * Gets the current setting controlling how many Connect / Reconnect attempts must occur
+     * before a warn message is logged.  A value of {@code <= 0} indicates that there will be
+     * no warn message logged regardless of how many reconnect attempts occur.
+     *
+     * @return the current number of connection attempts before warn logging is triggered.
+     */
+    public int getWarnAfterReconnectAttempts() {
+        return warnAfterReconnectAttempts;
+    }
+
+    /**
+     * Sets the number of Connect / Reconnect attempts that must occur before a warn message
+     * is logged indicating that the transport is not connected.  This can be useful when the
+     * client is running inside some container or service as it gives an indication of some
+     * problem with the client connection that might not otherwise be visible.  To disable the
+     * log messages this value should be set to a value @{code attempts <= 0}
+     *
+     * @param warnAfterReconnectAttempts
+     *        The number of failed connection attempts that must happen before a warning is logged.
+     */
+    public void setWarnAfterReconnectAttempts(int warnAfterReconnectAttempts) {
+        this.warnAfterReconnectAttempts = warnAfterReconnectAttempts;
+    }
+
+    public double getReconnectDelayExponent() {
+        return backOffMultiplier;
+    }
+
+    public void setReconnectDelayExponent(double reconnectDelayExponent) {
+        this.backOffMultiplier = reconnectDelayExponent;
+    }
+
+    public boolean isUseExponentialBackOff() {
+        return useExponentialBackOff;
+    }
+
+    public void setUseExponentialBackOff(boolean useExponentialBackOff) {
+        this.useExponentialBackOff = useExponentialBackOff;
+    }
+
+    public long getConnectTimeout() {
+        return this.connectTimeout;
+    }
+
+    public long getCloseTimeout() {
+        return this.closeTimeout;
+    }
+
+    public long getSendTimeout() {
+        return this.sendTimeout;
+    }
+
+    public long getRequestTimeout() {
+        return this.requestTimeout;
+    }
+
+    @Override
+    public String toString() {
+        return "FailoverProvider: " +
+               (connectedURI == null ? "unconnected" : connectedURI.toString());
+    }
+
+    //--------------- FailoverProvider Asynchronous Request --------------------//
+
+    /**
+     * For all requests that are dispatched from the FailoverProvider to a connected
+     * Provider instance an instance of FailoverRequest is used to handle errors that
+     * occur during processing of that request and trigger a reconnect.
+     *
+     * @param <T>
+     */
+    protected abstract class FailoverRequest extends ProviderFuture implements Runnable {
+
+        private final long id = requestId.incrementAndGet();
+
+        public FailoverRequest(AsyncResult watcher) {
+            super(watcher);
+        }
+
+        @Override
+        public void run() {
+            requests.put(id, this);
+            if (provider == null) {
+                if (failureWhenOffline()) {
+                    requests.remove(id);
+                    watcher.onFailure(new IOException("Provider disconnected"));
+                } else if (succeedsWhenOffline()) {
+                    onSuccess();
+                }
+            } else {
+                try {
+                    LOG.debug("Executing Failover Task: {}", this);
+                    doTask();
+                } catch (UnsupportedOperationException e) {
+                    requests.remove(id);
+                    watcher.onFailure(e);
+                } catch (Exception e) {
+                    // TODO Should we let JMSException through?
+                    LOG.debug("Caught exception while executing task: {}", e.getMessage());
+                    triggerReconnectionAttempt();
+                }
+            }
+        }
+
+        @Override
+        public void onFailure(final Throwable result) {
+            if (closed.get() || failed.get()) {
+                requests.remove(id);
+                super.onFailure(result);
+            } else {
+                LOG.debug("Request received error: {}", result.getMessage());
+                serializer.execute(new Runnable() {
+                    @Override
+                    public void run() {
+                        handleProviderFailure(IOExceptionSupport.create(result));
+                    }
+                });
+            }
+        }
+
+        @Override
+        public void onSuccess() {
+            requests.remove(id);
+            super.onSuccess();
+        }
+
+        /**
+         * Called to execute the specific task that was requested.
+         *
+         * @throws Exception if an error occurs during task execution.
+         */
+        public abstract void doTask() throws Exception;
+
+        /**
+         * Should the request just succeed when the Provider is not connected.
+         *
+         * @return true if the request is marked as successful when not connected.
+         */
+        public boolean succeedsWhenOffline() {
+            return false;
+        }
+
+        /**
+         * When the transport is not connected should this request automatically fail.
+         *
+         * @return true if the task should fail when the Provider is not connected.
+         */
+        public boolean failureWhenOffline() {
+            return false;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProviderFactory.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProviderFactory.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProviderFactory.java
new file mode 100644
index 0000000..8c63869
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProviderFactory.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.provider.failover;
+
+import java.net.URI;
+import java.util.Map;
+
+import org.apache.qpid.jms.provider.Provider;
+import org.apache.qpid.jms.provider.ProviderFactory;
+import org.apache.qpid.jms.util.PropertyUtil;
+import org.apache.qpid.jms.util.URISupport;
+import org.apache.qpid.jms.util.URISupport.CompositeData;
+
+/**
+ * Factory for creating instances of the Failover Provider type.
+ */
+public class FailoverProviderFactory extends ProviderFactory {
+
+    @Override
+    public Provider createAsyncProvider(URI remoteURI) throws Exception {
+        CompositeData composite = URISupport.parseComposite(remoteURI);
+        Map<String, String> options = composite.getParameters();
+        Map<String, String> nested = PropertyUtil.filterProperties(options, "nested.");
+
+        FailoverProvider provider = new FailoverProvider(composite.getComponents(), nested);
+        if (!PropertyUtil.setProperties(provider, options)) {
+            String msg = ""
+                + " Not all options could be set on the Failover provider."
+                + " Check the options are spelled correctly."
+                + " Given parameters=[" + options + "]."
+                + " This Provider cannot be started.";
+            throw new IllegalArgumentException(msg);
+        }
+
+        return provider;
+    }
+
+    @Override
+    public String getName() {
+        return "Failover";
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverUriPool.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverUriPool.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverUriPool.java
new file mode 100644
index 0000000..bb2d2b3
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverUriPool.java
@@ -0,0 +1,196 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.provider.failover;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.Map;
+
+import org.apache.qpid.jms.util.URISupport;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Manages the list of available failover URIs that are used to connect
+ * and recover a connection.
+ */
+public class FailoverUriPool {
+
+    private static final Logger LOG = LoggerFactory.getLogger(FailoverUriPool.class);
+
+    private final LinkedList<URI> uris;
+    private final Map<String, String> nestedOptions;
+    private boolean randomize;
+
+    public FailoverUriPool() {
+        this.uris = new LinkedList<URI>();
+        this.nestedOptions = Collections.emptyMap();
+    }
+
+    public FailoverUriPool(URI[] uris, Map<String, String> nestedOptions) {
+        this.uris = new LinkedList<URI>();
+        if (nestedOptions != null) {
+            this.nestedOptions = nestedOptions;
+        } else {
+            this.nestedOptions = Collections.emptyMap();
+        }
+
+        if (uris != null) {
+            for (URI uri : uris) {
+                this.add(uri);
+            }
+        }
+    }
+
+    /**
+     * Returns the next URI in the pool of URIs.  The URI will be shifted to the
+     * end of the list and not be attempted again until the full list has been
+     * returned once.
+     *
+     * @return the next URI that should be used for a connection attempt.
+     */
+    public URI getNext() {
+        URI next = null;
+        if (!uris.isEmpty()) {
+            next = uris.removeFirst();
+            uris.addLast(next);
+        }
+
+        return next;
+    }
+
+    /**
+     * Reports that the Failover Provider connected to the last URI returned from
+     * this pool.  If the Pool is set to randomize this will result in the Pool of
+     * URIs being shuffled in preparation for the next connect cycle.
+     */
+    public void connected() {
+        if (isRandomize()) {
+            Collections.shuffle(uris);
+        }
+    }
+
+    /**
+     * @return true if this pool returns the URI values in random order.
+     */
+    public boolean isRandomize() {
+        return randomize;
+    }
+
+    /**
+     * Sets whether the URIs that are returned by this pool are returned in random
+     * order or not.  If false the URIs are returned in FIFO order.
+     *
+     * @param randomize
+     *        true to have the URIs returned in a random order.
+     */
+    public void setRandomize(boolean randomize) {
+        this.randomize = randomize;
+        if (randomize) {
+            Collections.shuffle(uris);
+        }
+    }
+
+    /**
+     * Adds a new URI to the pool if not already contained within.  The URI will have
+     * any nest options that have been configured added to its existing set of options.
+     *
+     * @param uri
+     *        The new URI to add to the pool.
+     */
+    public void add(URI uri) {
+        if (!contains(uri)) {
+            if (!nestedOptions.isEmpty()) {
+                try {
+                    URISupport.applyParameters(uri, nestedOptions);
+                } catch (URISyntaxException e) {
+                    LOG.debug("Failed to add nested options to uri: {}", uri);
+                }
+            }
+
+            this.uris.add(uri);
+        }
+    }
+
+    /**
+     * Remove a URI from the pool if present, otherwise has no effect.
+     *
+     * @param uri
+     *        The URI to attempt to remove from the pool.
+     */
+    public void remove(URI uri) {
+        this.uris.remove(uri);
+    }
+
+    /**
+     * Returns the currently set value for nested options which will be added to each
+     * URI that is returned from the pool.
+     *
+     * @return the Map instance containing the nest options which can be empty if none set.
+     */
+    public Map<String, String> getNestedOptions() {
+        return nestedOptions;
+    }
+
+    private boolean contains(URI newURI) {
+        boolean result = false;
+        for (URI uri : uris) {
+            if (compareURIs(newURI, uri)) {
+                result = true;
+                break;
+            }
+        }
+
+        return result;
+    }
+
+    private boolean compareURIs(final URI first, final URI second) {
+        boolean result = false;
+        if (first == null || second == null) {
+            return result;
+        }
+
+        if (first.getPort() == second.getPort()) {
+            InetAddress firstAddr = null;
+            InetAddress secondAddr = null;
+            try {
+                firstAddr = InetAddress.getByName(first.getHost());
+                secondAddr = InetAddress.getByName(second.getHost());
+
+                if (firstAddr.equals(secondAddr)) {
+                    result = true;
+                }
+            } catch(IOException e) {
+                if (firstAddr == null) {
+                    LOG.error("Failed to Lookup INetAddress for URI[ " + first + " ] : " + e);
+                } else {
+                    LOG.error("Failed to Lookup INetAddress for URI[ " + second + " ] : " + e);
+                }
+
+                if (first.getHost().equalsIgnoreCase(second.getHost())) {
+                    result = true;
+                }
+            }
+        }
+
+        return result;
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/sasl/AbstractMechanism.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/sasl/AbstractMechanism.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/sasl/AbstractMechanism.java
new file mode 100644
index 0000000..d1972cd
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/sasl/AbstractMechanism.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.sasl;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Base class for SASL Authentication Mechanism that implements the basic
+ * methods of a Mechanism class.
+ */
+public abstract class AbstractMechanism implements Mechanism {
+
+    protected static final byte[] EMPTY = new byte[0];
+
+    private String username;
+    private String password;
+    private Map<String, Object> properties = new HashMap<String, Object>();
+
+    @Override
+    public int compareTo(Mechanism other) {
+
+        if (getPriority() < other.getPriority()) {
+            return -1;
+        } else if (getPriority() > other.getPriority()) {
+            return 1;
+        }
+
+        return 0;
+    }
+
+    @Override
+    public void setUsername(String value) {
+        this.username = value;
+    }
+
+    @Override
+    public String getUsername() {
+        return username;
+    }
+
+    @Override
+    public void setPassword(String value) {
+        this.password = value;
+    }
+
+    @Override
+    public String getPassword() {
+        return this.password;
+    }
+
+    @Override
+    public void setProperties(Map<String, Object> properties) {
+        this.properties = properties;
+    }
+
+    @Override
+    public Map<String, Object> getProperties() {
+        return this.properties;
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/sasl/AnonymousMechanism.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/sasl/AnonymousMechanism.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/sasl/AnonymousMechanism.java
new file mode 100644
index 0000000..903a5fe
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/sasl/AnonymousMechanism.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.sasl;
+
+/**
+ * Implements the Anonymous SASL authentication mechanism.
+ */
+public class AnonymousMechanism extends AbstractMechanism {
+
+    @Override
+    public byte[] getInitialResponse() {
+        return EMPTY;
+    }
+
+    @Override
+    public byte[] getChallengeResponse(byte[] challenge) {
+        return EMPTY;
+    }
+
+    @Override
+    public int getPriority() {
+        return PRIORITY.LOWEST.getValue();
+    }
+
+    @Override
+    public String getName() {
+        return "ANONYMOUS";
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/sasl/AnonymousMechanismFactory.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/sasl/AnonymousMechanismFactory.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/sasl/AnonymousMechanismFactory.java
new file mode 100644
index 0000000..28cd1f1
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/sasl/AnonymousMechanismFactory.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.sasl;
+
+/**
+ * Create the Anonymous SASL Authentication Mechanism types.
+ */
+public class AnonymousMechanismFactory implements MechanismFactory {
+
+    @Override
+    public Mechanism createMechanism() {
+        return new AnonymousMechanism();
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/sasl/CramMD5Mechanism.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/sasl/CramMD5Mechanism.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/sasl/CramMD5Mechanism.java
new file mode 100644
index 0000000..4b896ce
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/sasl/CramMD5Mechanism.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.sasl;
+
+import java.io.UnsupportedEncodingException;
+import java.security.InvalidKeyException;
+import java.security.NoSuchAlgorithmException;
+
+import javax.crypto.Mac;
+import javax.crypto.spec.SecretKeySpec;
+import javax.security.sasl.SaslException;
+
+/**
+ * Implements the SASL PLAIN authentication Mechanism.
+ *
+ * User name and Password values are sent without being encrypted.
+ */
+public class CramMD5Mechanism extends AbstractMechanism {
+
+    private static final String ASCII = "ASCII";
+    private static final String HMACMD5 = "HMACMD5";
+    private boolean _sentResponse;
+
+    @Override
+    public int getPriority() {
+        return PRIORITY.HIGH.getValue();
+    }
+
+    @Override
+    public String getName() {
+        return "CRAM-MD5";
+    }
+
+    @Override
+    public byte[] getInitialResponse() {
+        return EMPTY;
+    }
+
+    @Override
+    public byte[] getChallengeResponse(byte[] challenge) throws SaslException {
+        if (!_sentResponse && challenge != null && challenge.length != 0) {
+            try {
+                SecretKeySpec key = new SecretKeySpec(getPassword().getBytes(ASCII), HMACMD5);
+                Mac mac = Mac.getInstance(HMACMD5);
+                mac.init(key);
+
+                byte[] bytes = mac.doFinal(challenge);
+
+                StringBuffer hash = new StringBuffer(getUsername());
+                hash.append(' ');
+                for (int i = 0; i < bytes.length; i++) {
+                    String hex = Integer.toHexString(0xFF & bytes[i]);
+                    if (hex.length() == 1) {
+                        hash.append('0');
+                    }
+                    hash.append(hex);
+                }
+
+                _sentResponse = true;
+                return hash.toString().getBytes(ASCII);
+            } catch (UnsupportedEncodingException e) {
+                throw new SaslException("Unable to utilise required encoding", e);
+            } catch (InvalidKeyException e) {
+                throw new SaslException("Unable to utilise key", e);
+            } catch (NoSuchAlgorithmException e) {
+                throw new SaslException("Unable to utilise required algorithm", e);
+            }
+        } else {
+            return EMPTY;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/sasl/CramMD5MechanismFactory.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/sasl/CramMD5MechanismFactory.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/sasl/CramMD5MechanismFactory.java
new file mode 100644
index 0000000..0ce5736
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/sasl/CramMD5MechanismFactory.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.sasl;
+
+/**
+ * Create the Plain SASL Authentication Mechanism types.
+ */
+public class CramMD5MechanismFactory implements MechanismFactory {
+
+    @Override
+    public Mechanism createMechanism() {
+        return new CramMD5Mechanism();
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/sasl/Mechanism.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/sasl/Mechanism.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/sasl/Mechanism.java
new file mode 100644
index 0000000..4fbfcd5
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/sasl/Mechanism.java
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.sasl;
+
+import java.util.Map;
+
+import javax.security.sasl.SaslException;
+
+/**
+ * Interface for all SASL authentication mechanism implementations.
+ */
+public interface Mechanism extends Comparable<Mechanism> {
+
+    /**
+     * Relative priority values used to arrange the found SASL
+     * mechanisms in a preferred order where the level of security
+     * generally defines the preference.
+     */
+    public enum PRIORITY {
+        LOWEST(0),
+        LOW(1),
+        MEDIUM(2),
+        HIGH(3),
+        HIGHEST(4);
+
+        private final int value;
+
+        private PRIORITY(int value) {
+            this.value = value;
+        }
+
+        public int getValue() {
+            return value;
+       }
+    };
+
+    /**
+     * @return return the relative priority of this SASL mechanism.
+     */
+    int getPriority();
+
+    /**
+     * @return the well known name of this SASL mechanism.
+     */
+    String getName();
+
+    /**
+     * @return the response buffer used to answer the initial SASL cycle.
+     * @throws SaslException if an error occurs computing the response.
+     */
+    byte[] getInitialResponse() throws SaslException;
+
+    /**
+     * Create a response based on a given challenge from the remote peer.
+     *
+     * @param challenge
+     *        the challenge that this Mechanism should response to.
+     *
+     * @return the response that answers the given challenge.
+     * @throws SaslException if an error occurs computing the response.
+     */
+    byte[] getChallengeResponse(byte[] challenge) throws SaslException;
+
+    /**
+     * Sets the user name value for this Mechanism.  The Mechanism can ignore this
+     * value if it does not utilize user name in it's authentication processing.
+     *
+     * @param username
+     *        The user name given.
+     */
+    void setUsername(String value);
+
+    /**
+     * Returns the configured user name value for this Mechanism.
+     *
+     * @return the currently set user name value for this Mechanism.
+     */
+    String getUsername();
+
+    /**
+     * Sets the password value for this Mechanism.  The Mechanism can ignore this
+     * value if it does not utilize a password in it's authentication processing.
+     *
+     * @param username
+     *        The user name given.
+     */
+    void setPassword(String value);
+
+    /**
+     * Returns the configured password value for this Mechanism.
+     *
+     * @return the currently set password value for this Mechanism.
+     */
+    String getPassword();
+
+    /**
+     * Sets any additional Mechanism specific properties using a Map<String, Object>
+     *
+     * @param options
+     *        the map of additional properties that this Mechanism should utilize.
+     */
+    void setProperties(Map<String, Object> options);
+
+    /**
+     * The currently set Properties for this Mechanism.
+     *
+     * @return the current set of configuration Properties for this Mechanism.
+     */
+    Map<String, Object> getProperties();
+
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/sasl/MechanismFactory.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/sasl/MechanismFactory.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/sasl/MechanismFactory.java
new file mode 100644
index 0000000..a47f38c
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/sasl/MechanismFactory.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.sasl;
+
+/**
+ * Interface for all SASL authentication mechanism implementations.
+ */
+public interface MechanismFactory {
+
+    /**
+     * Creates an instance of the authentication mechanism implementation.
+     *
+     * @return a new Mechanism instance.
+     */
+    Mechanism createMechanism();
+
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/sasl/PlainMechanism.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/sasl/PlainMechanism.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/sasl/PlainMechanism.java
new file mode 100644
index 0000000..b305e98
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/sasl/PlainMechanism.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.sasl;
+
+/**
+ * Implements the SASL PLAIN authentication Mechanism.
+ *
+ * User name and Password values are sent without being encrypted.
+ */
+public class PlainMechanism extends AbstractMechanism {
+
+    @Override
+    public int getPriority() {
+        return PRIORITY.MEDIUM.getValue();
+    }
+
+    @Override
+    public String getName() {
+        return "PLAIN";
+    }
+
+    @Override
+    public byte[] getInitialResponse() {
+
+        String username = getUsername();
+        String password = getPassword();
+
+        if (username == null) {
+            username = "";
+        }
+
+        if (password == null) {
+            password = "";
+        }
+
+        byte[] usernameBytes = username.getBytes();
+        byte[] passwordBytes = password.getBytes();
+        byte[] data = new byte[usernameBytes.length + passwordBytes.length + 2];
+        System.arraycopy(usernameBytes, 0, data, 1, usernameBytes.length);
+        System.arraycopy(passwordBytes, 0, data, 2 + usernameBytes.length, passwordBytes.length);
+        return data;
+    }
+
+    @Override
+    public byte[] getChallengeResponse(byte[] challenge) {
+        return EMPTY;
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/sasl/PlainMechanismFactory.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/sasl/PlainMechanismFactory.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/sasl/PlainMechanismFactory.java
new file mode 100644
index 0000000..3cdd205
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/sasl/PlainMechanismFactory.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.sasl;
+
+/**
+ * Create the Plain SASL Authentication Mechanism types.
+ */
+public class PlainMechanismFactory implements MechanismFactory {
+
+    @Override
+    public Mechanism createMechanism() {
+        return new PlainMechanism();
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/sasl/SaslMechanismFinder.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/sasl/SaslMechanismFinder.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/sasl/SaslMechanismFinder.java
new file mode 100644
index 0000000..5bc0a94
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/sasl/SaslMechanismFinder.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.sasl;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.qpid.jms.util.FactoryFinder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Used to find a SASL Mechanism that most closely matches the preferred set
+ * of Mechanisms supported by the remote peer.
+ *
+ * The Matching mechanism is chosen by first find all instances of SASL
+ * mechanism types that are supported on the remote peer, and then making a
+ * final selection based on the Mechanism in the found set that has the
+ * highest priority value.
+ */
+public class SaslMechanismFinder {
+
+    private static final Logger LOG = LoggerFactory.getLogger(SaslMechanismFinder.class);
+
+    private static final FactoryFinder<MechanismFactory> MECHANISM_FACTORY_FINDER =
+        new FactoryFinder<MechanismFactory>(MechanismFactory.class,
+            "META-INF/services/" + SaslMechanismFinder.class.getPackage().getName().replace(".", "/") + "/");
+
+    /**
+     * Attempts to find a matching Mechanism implementation given a list of supported
+     * mechanisms from a remote peer.  Can return null if no matching Mechanisms are
+     * found.
+     *
+     * @param remoteMechanisms
+     *        list of mechanism names that are supported by the remote peer.
+     *
+     * @return the best matching Mechanism for the supported remote set.
+     */
+    public static Mechanism findMatchingMechanism(String...remoteMechanisms) {
+
+        Mechanism match = null;
+        List<Mechanism> found = new ArrayList<Mechanism>();
+
+        for (String remoteMechanism : remoteMechanisms) {
+            try {
+                MechanismFactory factory = findMechanismFactory(remoteMechanism);
+                found.add(factory.createMechanism());
+            } catch (IOException e) {
+                LOG.warn("Caught exception while searching for SASL mechanisms: {}", e.getMessage());
+            }
+        }
+
+        if (!found.isEmpty()) {
+            // Sorts by priority using Mechanism comparison and return the last value in
+            // list which is the Mechanism deemed to be the highest priority match.
+            Collections.sort(found);
+            match = found.get(found.size() - 1);
+        }
+
+        LOG.info("Best match for SASL auth was: {}", match);
+
+        return match;
+    }
+
+    /**
+     * Searches for a MechanismFactory by using the scheme from the given name.
+     *
+     * The search first checks the local cache of mechanism factories before moving on
+     * to search in the classpath.
+     *
+     * @param name
+     *        The name of the authentication mechanism to search for..
+     *
+     * @return a mechanism factory instance matching the URI's scheme.
+     *
+     * @throws IOException if an error occurs while locating the factory.
+     */
+    protected static MechanismFactory findMechanismFactory(String name) throws IOException {
+        if (name == null || name.isEmpty()) {
+            throw new IOException("No Mechanism name specified: [" + name + "]");
+        }
+
+        MechanismFactory factory = null;
+        try {
+            factory = MECHANISM_FACTORY_FINDER.newInstance(name);
+        } catch (Throwable e) {
+            throw new IOException("Mechanism scheme NOT recognized: [" + name + "]", e);
+        }
+
+        return factory;
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/RawTcpTransport.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/RawTcpTransport.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/RawTcpTransport.java
new file mode 100644
index 0000000..ae96ed7
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/RawTcpTransport.java
@@ -0,0 +1,383 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.transports;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.net.SocketException;
+import java.net.URI;
+import java.net.UnknownHostException;
+import java.nio.ByteBuffer;
+import java.nio.channels.Channels;
+import java.nio.channels.WritableByteChannel;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+import javax.net.SocketFactory;
+
+import org.apache.qpid.jms.util.IOExceptionSupport;
+import org.apache.qpid.jms.util.InetAddressUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.vertx.java.core.buffer.Buffer;
+
+/**
+ *
+ */
+public class RawTcpTransport implements Transport, Runnable {
+
+    private static final Logger LOG = LoggerFactory.getLogger(RawTcpTransport.class);
+
+    private TransportListener listener;
+    private final URI remoteLocation;
+    private final AtomicBoolean connected = new AtomicBoolean();
+    private final AtomicBoolean closed = new AtomicBoolean();
+    private final AtomicReference<Throwable> connectionError = new AtomicReference<Throwable>();
+
+    private final Socket socket;
+    private DataOutputStream dataOut;
+    private DataInputStream dataIn;
+    private Thread runner;
+
+    private boolean closeAsync = true;
+    private int socketBufferSize = 64 * 1024;
+    private int soTimeout = 0;
+    private int soLinger = Integer.MIN_VALUE;
+    private Boolean keepAlive;
+    private Boolean tcpNoDelay = true;
+    private boolean useLocalHost = false;
+    private int ioBufferSize = 8 * 1024;
+
+    /**
+     * Create a new instance of the transport.
+     *
+     * @param listener
+     *        The TransportListener that will receive data from this Transport instance.
+     * @param remoteLocation
+     *        The remote location where this transport should connection to.
+     */
+    public RawTcpTransport(TransportListener listener, URI remoteLocation) {
+        this.listener = listener;
+        this.remoteLocation = remoteLocation;
+
+        Socket temp = null;
+        try {
+            temp = createSocketFactory().createSocket();
+        } catch (IOException e) {
+            connectionError.set(e);
+        }
+
+        this.socket = temp;
+    }
+
+    @Override
+    public void connect() throws IOException {
+        if (connectionError.get() != null) {
+            throw IOExceptionSupport.create(connectionError.get());
+        }
+
+        if (socket == null) {
+            throw new IllegalStateException("Cannot connect if the socket or socketFactory have not been set");
+        }
+
+        InetSocketAddress remoteAddress = null;
+
+        if (remoteLocation != null) {
+            String host = resolveHostName(remoteLocation.getHost());
+            remoteAddress = new InetSocketAddress(host, remoteLocation.getPort());
+        }
+
+        socket.connect(remoteAddress);
+
+        connected.set(true);
+
+        initialiseSocket(socket);
+        initializeStreams();
+
+        runner = new Thread(null, this, "QpidJMS RawTcpTransport: " + toString());
+        runner.setDaemon(false);
+        runner.start();
+    }
+
+    @Override
+    public void close() throws IOException {
+        if (closed.compareAndSet(false, true)) {
+            if (socket == null) {
+                return;
+            }
+
+            // Closing the streams flush the sockets before closing.. if the socket
+            // is hung.. then this hangs the close so we support an asynchronous close
+            // by default which will timeout if the close doesn't happen after a delay.
+            if (closeAsync) {
+                final CountDownLatch latch = new CountDownLatch(1);
+
+                final ExecutorService closer = Executors.newSingleThreadExecutor();
+                closer.execute(new Runnable() {
+                    @Override
+                    public void run() {
+                        LOG.trace("Closing socket {}", socket);
+                        try {
+                            socket.close();
+                            LOG.debug("Closed socket {}", socket);
+                        } catch (IOException e) {
+                            if (LOG.isDebugEnabled()) {
+                                LOG.debug("Caught exception closing socket " + socket + ". This exception will be ignored.", e);
+                            }
+                        } finally {
+                            latch.countDown();
+                        }
+                    }
+                });
+
+                try {
+                    latch.await(1,TimeUnit.SECONDS);
+                } catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();
+                } finally {
+                    closer.shutdownNow();
+                }
+            } else {
+                LOG.trace("Closing socket {}", socket);
+                try {
+                    socket.close();
+                    LOG.debug("Closed socket {}", socket);
+                } catch (IOException e) {
+                    LOG.debug("Caught exception closing socket {}. This exception will be ignored.", socket, e);
+                }
+            }
+        }
+    }
+
+    @Override
+    public void send(ByteBuffer output) throws IOException {
+        checkConnected();
+        LOG.info("RawTcpTransport sending packet of size: {}", output.remaining());
+        if (dataOut instanceof OutputStream) {
+            WritableByteChannel channel = Channels.newChannel(dataOut);
+            channel.write(output);
+        } else {
+            while (output.hasRemaining()) {
+                dataOut.writeByte(output.get());
+            }
+        }
+        dataOut.flush();
+    }
+
+    @Override
+    public void send(org.fusesource.hawtbuf.Buffer output) throws IOException {
+        checkConnected();
+        send(output.toByteBuffer());
+    }
+
+    @Override
+    public boolean isConnected() {
+        return this.connected.get();
+    }
+
+    @Override
+    public TransportListener getTransportListener() {
+        return this.listener;
+    }
+
+    @Override
+    public void setTransportListener(TransportListener listener) {
+        if (listener == null) {
+            throw new IllegalArgumentException("Listener cannot be set to null");
+        }
+
+        this.listener = listener;
+    }
+
+    public int getSocketBufferSize() {
+        return socketBufferSize;
+    }
+
+    public void setSocketBufferSize(int socketBufferSize) {
+        this.socketBufferSize = socketBufferSize;
+    }
+
+    public int getSoTimeout() {
+        return soTimeout;
+    }
+
+    public void setSoTimeout(int soTimeout) {
+        this.soTimeout = soTimeout;
+    }
+
+    public boolean isTcpNoDelay() {
+        return tcpNoDelay;
+    }
+
+    public void setTcpNoDelay(Boolean tcpNoDelay) {
+        this.tcpNoDelay = tcpNoDelay;
+    }
+
+    public int getSoLinger() {
+        return soLinger;
+    }
+
+    public void setSoLinger(int soLinger) {
+        this.soLinger = soLinger;
+    }
+
+    public boolean isKeepAlive() {
+        return keepAlive;
+    }
+
+    public void setKeepAlive(Boolean keepAlive) {
+        this.keepAlive = keepAlive;
+    }
+
+    public boolean isUseLocalHost() {
+        return useLocalHost;
+    }
+
+    public void setUseLocalHost(boolean useLocalHost) {
+        this.useLocalHost = useLocalHost;
+    }
+
+    public int getIoBufferSize() {
+        return ioBufferSize;
+    }
+
+    public void setIoBufferSize(int ioBufferSize) {
+        this.ioBufferSize = ioBufferSize;
+    }
+
+    public boolean isCloseAsync() {
+        return closeAsync;
+    }
+
+    public void setCloseAsync(boolean closeAsync) {
+        this.closeAsync = closeAsync;
+    }
+
+    //---------- Transport internal implementation ---------------------------//
+
+    @Override
+    public void run() {
+        LOG.trace("TCP consumer thread for " + this + " starting");
+        try {
+            while (isConnected()) {
+                doRun();
+            }
+        } catch (IOException e) {
+            connectionError.set(e);
+            onException(e);
+        } catch (Throwable e) {
+            IOException ioe = new IOException("Unexpected error occured: " + e);
+            connectionError.set(ioe);
+            ioe.initCause(e);
+            onException(ioe);
+        }
+    }
+
+    protected void doRun() throws IOException {
+        int size = dataIn.available();
+        if (size <= 0) {
+            try {
+                TimeUnit.NANOSECONDS.sleep(1);
+            } catch (InterruptedException e) {
+            }
+            return;
+        }
+
+        byte[] buffer = new byte[size];
+        dataIn.readFully(buffer);
+        Buffer incoming = new Buffer(buffer);
+        listener.onData(incoming);
+    }
+
+    /**
+     * Passes any IO exceptions into the transport listener
+     */
+    public void onException(IOException e) {
+        if (listener != null) {
+            try {
+                listener.onTransportError(e);
+            } catch (RuntimeException e2) {
+                LOG.debug("Unexpected runtime exception: " + e2, e2);
+            }
+        }
+    }
+
+    protected SocketFactory createSocketFactory() throws IOException {
+        return SocketFactory.getDefault();
+    }
+
+    protected void initialiseSocket(Socket sock) throws SocketException, IllegalArgumentException {
+        try {
+            sock.setReceiveBufferSize(socketBufferSize);
+            sock.setSendBufferSize(socketBufferSize);
+        } catch (SocketException se) {
+            LOG.warn("Cannot set socket buffer size = {}", socketBufferSize);
+            LOG.debug("Cannot set socket buffer size. Reason: {}. This exception is ignored.", se.getMessage(), se);
+        }
+
+        sock.setSoTimeout(soTimeout);
+
+        if (keepAlive != null) {
+            sock.setKeepAlive(keepAlive.booleanValue());
+        }
+
+        if (soLinger > -1) {
+            sock.setSoLinger(true, soLinger);
+        } else if (soLinger == -1) {
+            sock.setSoLinger(false, 0);
+        }
+
+        if (tcpNoDelay != null) {
+            sock.setTcpNoDelay(tcpNoDelay.booleanValue());
+        }
+    }
+
+    protected void initializeStreams() throws IOException {
+        try {
+            TcpBufferedInputStream buffIn = new TcpBufferedInputStream(socket.getInputStream(), ioBufferSize);
+            this.dataIn = new DataInputStream(buffIn);
+            TcpBufferedOutputStream outputStream = new TcpBufferedOutputStream(socket.getOutputStream(), ioBufferSize);
+            this.dataOut = new DataOutputStream(outputStream);
+        } catch (Throwable e) {
+            throw IOExceptionSupport.create(e);
+        }
+    }
+
+    protected String resolveHostName(String host) throws UnknownHostException {
+        if (isUseLocalHost()) {
+            String localName = InetAddressUtil.getLocalHostName();
+            if (localName != null && localName.equals(host)) {
+                return "localhost";
+            }
+        }
+        return host;
+    }
+
+    private void checkConnected() throws IOException {
+        if (!connected.get()) {
+            throw new IOException("Cannot send to a non-connected transport.");
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/SslTransport.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/SslTransport.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/SslTransport.java
new file mode 100644
index 0000000..49d250c
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/SslTransport.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.transports;
+
+import java.io.IOException;
+import java.net.URI;
+
+import org.apache.qpid.jms.JmsSslContext;
+import org.vertx.java.core.net.NetClient;
+
+/**
+ * Provides SSL configuration to the Vert.x NetClient object used by the underling
+ * TCP based Transport.
+ */
+public class SslTransport extends TcpTransport {
+
+    private final JmsSslContext context;
+
+    /**
+     * Create an instance of the SSL transport
+     *
+     * @param listener
+     *        The TransportListener that will handle events from this Transport instance.
+     * @param remoteLocation
+     *        The location that is being connected to.
+     * @param JmsSslContext
+     *        The JMS Framework SslContext to use for this SSL connection.
+     */
+    public SslTransport(TransportListener listener, URI remoteLocation, JmsSslContext context) {
+        super(listener, remoteLocation);
+
+        this.context = context;
+    }
+
+    @Override
+    protected void configureNetClient(NetClient client) throws IOException {
+        super.configureNetClient(client);
+
+        client.setSSL(true);
+        client.setKeyStorePath(context.getKeyStoreLocation());
+        client.setKeyStorePassword(context.getKeyStorePassword());
+        client.setTrustStorePath(context.getTrustStoreLocation());
+        client.setTrustStorePassword(context.getTrustStorePassword());
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message